# SPDX-License-Identifier: Apache-2.0 # The OpenSearch Contributors require contributions made to # this file be licensed under the Apache-2.0 license or a # compatible open source license. # Any modifications Copyright OpenSearch Contributors. See # GitHub history for details. # Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from datetime import datetime, timedelta import pandas as pd import pytest from opensearchpy.helpers import BulkIndexError from opensearch_py_ml import DataFrame, pandas_to_opensearch from tests.common import ( OPENSEARCH_TEST_CLIENT, assert_frame_equal, assert_pandas_opensearch_py_ml_frame_equal, ) dt = datetime.utcnow() pd_df = pd.DataFrame( { "a": [1, 2, 3], "b": [1.0, 2.0, 3.0], "c": ["A", "B", "C"], "d": [dt, dt + timedelta(1), dt + timedelta(2)], }, index=["0", "1", "2"], ) pd_df2 = pd.DataFrame({"Z": [3, 2, 1], "a": ["C", "D", "E"]}, index=["0", "1", "2"]) @pytest.fixture(scope="function", autouse=True) def delete_test_index(): OPENSEARCH_TEST_CLIENT.indices.delete(index="test-index", ignore=404) yield OPENSEARCH_TEST_CLIENT.indices.delete(index="test-index", ignore=404) class TestPandasToEland: def test_returns_oml_dataframe(self): df = pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index" ) assert isinstance(df, DataFrame) assert "os_index_pattern: test-index" in df.os_info() def test_os_if_exists_fail(self): pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index" ) with pytest.raises(ValueError) as e: pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index" ) assert str(e.value) == ( "Could not create the index [test-index] because it " "already exists. Change the 'os_if_exists' parameter " "to 'append' or 'replace' data." ) def test_es_if_exists_replace(self): # Assert that 'replace' allows for creation df1 = pandas_to_opensearch( pd_df2, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="replace", os_refresh=True, ).to_pandas() assert_frame_equal(pd_df2, df1) # Assert that 'replace' will replace existing mapping and entries df2 = pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="replace", os_refresh=True, ) assert_pandas_opensearch_py_ml_frame_equal(pd_df, df2) df3 = pandas_to_opensearch( pd_df2, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="replace", os_refresh=True, ).to_pandas() assert_frame_equal(df1, df3) def test_es_if_exists_append(self): df1 = pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, # We use 'short' here specifically so that the # assumed type of 'long' is coerced into a 'short' # by append mode. os_type_overrides={"a": "short"}, ) assert_pandas_opensearch_py_ml_frame_equal(pd_df, df1) assert df1.shape == (3, 4) pd_df2 = pd.DataFrame( { "a": [4, 5, 6], "b": [-1.0, -2.0, -3.0], "c": ["A", "B", "C"], "d": [dt, dt - timedelta(1), dt - timedelta(2)], }, index=["3", "4", "5"], ) df2 = pandas_to_opensearch( pd_df2, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, ) # Assert that the second pandas dataframe is actually appended assert df2.shape == (6, 4) pd_df3 = pd.concat([pd_df, pd_df2]) assert_pandas_opensearch_py_ml_frame_equal(pd_df3, df2) def test_os_if_exists_append_mapping_mismatch_schema_enforcement(self): df1 = pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, ) with pytest.raises(ValueError) as e: pandas_to_opensearch( pd_df2, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", ) assert str(e.value) == ( "DataFrame dtypes and OpenSearch index mapping aren't compatible:\n" "- 'b' is missing from DataFrame columns\n" "- 'c' is missing from DataFrame columns\n" "- 'd' is missing from DataFrame columns\n" "- 'Z' is missing from opensearch index mapping\n" "- 'a' column type ('keyword') not compatible with opensearch index mapping type ('long')" ) # Assert that the index isn't modified assert_pandas_opensearch_py_ml_frame_equal(pd_df, df1) def test_os_if_exists_append_mapping_mismatch_no_schema_enforcement(self): pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, ) pd_df2 = pd.DataFrame( { "a": [4, 5, 6], "b": [-1.0, -2.0, -3.0], "d": [dt, dt - timedelta(1), dt - timedelta(2)], "e": ["A", "B", "C"], }, index=["3", "4", "5"], ) pandas_to_opensearch( pd_df2, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, os_verify_mapping_compatibility=False, ) final_df = pd.DataFrame( { "a": [1, 2, 3, 4, 5, 6], "b": [1.0, 2.0, 3.0, -1.0, -2.0, -3.0], "c": ["A", "B", "C", None, None, None], "d": [ dt, dt + timedelta(1), dt + timedelta(2), dt, dt - timedelta(1), dt - timedelta(2), ], "e": [None, None, None, "A", "B", "C"], }, index=["0", "1", "2", "3", "4", "5"], ) oml_df = DataFrame(OPENSEARCH_TEST_CLIENT, "test-index") # Assert that the index isn't modified assert_pandas_opensearch_py_ml_frame_equal(final_df, oml_df) def test_os_if_exists_append_os_type_coerce_error(self): df1 = pandas_to_opensearch( pd_df, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", os_refresh=True, os_type_overrides={"a": "byte"}, ) assert_pandas_opensearch_py_ml_frame_equal(pd_df, df1) pd_df_short = pd.DataFrame( { "a": [128], # This value is too large for 'byte' "b": [-1.0], "c": ["A"], "d": [dt], }, index=["3"], ) with pytest.raises(BulkIndexError) as e: pandas_to_opensearch( pd_df_short, os_client=OPENSEARCH_TEST_CLIENT, os_dest_index="test-index", os_if_exists="append", ) # Assert that the value 128 caused the index error assert "Value [128] is out of range for a byte" in str(e.value.errors) def test_pandas_to_oml_text_inserts_keyword(self): os = OPENSEARCH_TEST_CLIENT df1 = pandas_to_opensearch( pd_df, os_client=os, os_dest_index="test-index", os_if_exists="append", os_refresh=True, os_type_overrides={ "c": "text", "b": {"type": "float"}, "d": {"type": "text"}, }, ) assert os.indices.get_mapping(index="test-index") == { "test-index": { "mappings": { "properties": { "a": {"type": "long"}, "b": {"type": "float"}, "c": { "fields": {"keyword": {"type": "keyword"}}, "type": "text", }, "d": {"type": "text"}, } } } } # 'c' is aggregatable on 'keyword' assert df1.groupby("c").mean().to_dict() == { "a": {"A": 1.0, "B": 2.0, "C": 3.0}, "b": {"A": 1.0, "B": 2.0, "C": 3.0}, } # 'd' isn't aggregatable because it's missing the 'keyword' with pytest.raises(ValueError) as e: df1.groupby("d").mean() assert str(e.value) == ( "Cannot use 'd' with groupby() because it has " "no aggregatable fields in Opensearch" )