# mypy: disable-error-code=name-defined
"""Amazon OpenSearch Read Module (PRIVATE)."""
from typing import Any, Collection, Dict, List, Mapping, Optional, Union
import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler.opensearch._utils import _get_distribution, _is_serverless
opensearchpy = _utils.import_optional_dependency("opensearchpy")
def _resolve_fields(row: Mapping[str, Any]) -> Mapping[str, Any]:
fields = {}
for field in row:
if isinstance(row[field], dict):
nested_fields = _resolve_fields(row[field])
for n_field, val in nested_fields.items():
fields[f"{field}.{n_field}"] = val
else:
fields[field] = row[field]
return fields
def _hit_to_row(hit: Mapping[str, Any]) -> Mapping[str, Any]:
row: Dict[str, Any] = {}
for k in hit.keys():
if k == "_source":
solved_fields = _resolve_fields(hit["_source"])
row.update(solved_fields)
elif k.startswith("_"):
row[k] = hit[k]
return row
def _search_response_to_documents(response: Mapping[str, Any]) -> List[Mapping[str, Any]]:
return [_hit_to_row(hit) for hit in response.get("hits", {}).get("hits", [])]
def _search_response_to_df(response: Union[Mapping[str, Any], Any]) -> pd.DataFrame:
return pd.DataFrame(_search_response_to_documents(response))
@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def search(
client: "opensearchpy.OpenSearch",
index: Optional[str] = "_all",
search_body: Optional[Dict[str, Any]] = None,
doc_type: Optional[str] = None,
is_scroll: Optional[bool] = False,
filter_path: Optional[Union[str, Collection[str]]] = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Return results matching query DSL as pandas DataFrame.
Parameters
----------
client : OpenSearch
instance of opensearchpy.OpenSearch to use.
index : str, optional
A comma-separated list of index names to search.
use `_all` or empty string to perform the operation on all indices.
search_body : Dict[str, Any], optional
The search definition using the `Query DSL `_.
doc_type : str, optional
Name of the document type (for Elasticsearch versions 5.x and earlier).
is_scroll : bool, optional
Allows to retrieve a large numbers of results from a single search request using
`scroll `_
for example, for machine learning jobs.
Because scroll search contexts consume a lot of memory, we suggest you don’t use the scroll operation
for frequent user queries.
filter_path : Union[str, Collection[str]], optional
Use the filter_path parameter to reduce the size of the OpenSearch Service response \
(default: ['hits.hits._id','hits.hits._source'])
**kwargs :
KEYWORD arguments forwarded to `opensearchpy.OpenSearch.search \
`_
and also to `opensearchpy.helpers.scan `_
if `is_scroll=True`
Returns
-------
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
Results as Pandas DataFrame
Examples
--------
Searching an index using query DSL
>>> import awswrangler as wr
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
>>> df = wr.opensearch.search(
... client=client,
... index='movies',
... search_body={
... "query": {
... "match": {
... "title": "wind"
... }
... }
... }
... )
"""
if is_scroll and _is_serverless(client):
raise exceptions.NotSupported("Scrolled search is not currently available for OpenSearch Serverless.")
if doc_type:
kwargs["doc_type"] = doc_type
if filter_path is None:
filter_path = ["hits.hits._id", "hits.hits._source"]
if is_scroll:
if isinstance(filter_path, str):
filter_path = [filter_path]
filter_path = ["_scroll_id", "_shards"] + list(filter_path) # required for scroll
documents_generator = opensearchpy.helpers.scan(
client, index=index, query=search_body, filter_path=filter_path, **kwargs
)
documents = [_hit_to_row(doc) for doc in documents_generator]
df = pd.DataFrame(documents)
else:
response = client.search(index=index, body=search_body, filter_path=filter_path, **kwargs)
df = _search_response_to_df(response)
return df
@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
def search_by_sql(client: "opensearchpy.OpenSearch", sql_query: str, **kwargs: Any) -> pd.DataFrame:
"""Return results matching `SQL query `_ as pandas DataFrame.
Parameters
----------
client : OpenSearch
instance of opensearchpy.OpenSearch to use.
sql_query : str
SQL query
**kwargs :
KEYWORD arguments forwarded to request url (e.g.: filter_path, etc.)
Returns
-------
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
Results as Pandas DataFrame
Examples
--------
Searching an index using SQL query
>>> import awswrangler as wr
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
>>> df = wr.opensearch.search_by_sql(
>>> client=client,
>>> sql_query='SELECT * FROM my-index LIMIT 50'
>>> )
"""
if _is_serverless(client):
raise exceptions.NotSupported("SQL plugin is not currently available for OpenSearch Serverless.")
if _get_distribution(client) == "opensearch":
url = "/_plugins/_sql"
else:
url = "/_opendistro/_sql"
kwargs["format"] = "json"
body = {"query": sql_query}
for size_att in ["size", "fetch_size"]:
if size_att in kwargs:
body["fetch_size"] = kwargs[size_att]
del kwargs[size_att] # unrecognized parameter
response = client.transport.perform_request(
"POST", url, headers={"content-type": "application/json"}, body=body, params=kwargs
)
df = _search_response_to_df(response)
return df