# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """The FeatureGroup entity for FeatureStore. A feature group is a logical grouping of features, defined in the Feature Store, to describe records. A feature group definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store. Create feature group, describe feature group, update feature groups, delete feature group and list feature groups APIs can be used to manage feature groups. """ from __future__ import absolute_import import copy import logging import math import os import tempfile from concurrent.futures import as_completed from concurrent.futures import ThreadPoolExecutor from typing import Sequence, List, Dict, Any, Union from urllib.parse import urlparse from multiprocessing.pool import AsyncResult import signal import attr import pandas as pd from pandas import DataFrame import boto3 from botocore.config import Config from pathos.multiprocessing import ProcessingPool from sagemaker.config import ( FEATURE_GROUP_ROLE_ARN_PATH, FEATURE_GROUP_OFFLINE_STORE_KMS_KEY_ID_PATH, FEATURE_GROUP_ONLINE_STORE_KMS_KEY_ID_PATH, ) from sagemaker.session import Session from sagemaker.feature_store.feature_definition import ( FeatureDefinition, FeatureTypeEnum, ) from sagemaker.feature_store.inputs import ( OnlineStoreConfig, OnlineStoreSecurityConfig, S3StorageConfig, OfflineStoreConfig, DataCatalogConfig, FeatureValue, FeatureParameter, TableFormatEnum, DeletionModeEnum, TtlDuration, OnlineStoreConfigUpdate, ) from sagemaker.utils import resolve_value_from_config logger = logging.getLogger(__name__) @attr.s class AthenaQuery: """Class to manage querying of feature store data with AWS Athena. This class instantiates a AthenaQuery object that is used to retrieve data from feature store via standard SQL queries. Attributes: catalog (str): name of the data catalog. database (str): name of the database. table_name (str): name of the table. sagemaker_session (Session): instance of the Session class to perform boto calls. """ catalog: str = attr.ib() database: str = attr.ib() table_name: str = attr.ib() sagemaker_session: Session = attr.ib() _current_query_execution_id: str = attr.ib(init=False, default=None) _result_bucket: str = attr.ib(init=False, default=None) _result_file_prefix: str = attr.ib(init=False, default=None) def run( self, query_string: str, output_location: str, kms_key: str = None, workgroup: str = None ) -> str: """Execute a SQL query given a query string, output location and kms key. This method executes the SQL query using Athena and outputs the results to output_location and returns the execution id of the query. Args: query_string: SQL query string. output_location: S3 URI of the query result. kms_key: KMS key id. If set, will be used to encrypt the query result file. workgroup (str): The name of the workgroup in which the query is being started. Returns: Execution id of the query. """ response = self.sagemaker_session.start_query_execution( catalog=self.catalog, database=self.database, query_string=query_string, output_location=output_location, kms_key=kms_key, workgroup=workgroup, ) self._current_query_execution_id = response["QueryExecutionId"] parse_result = urlparse(output_location, allow_fragments=False) self._result_bucket = parse_result.netloc self._result_file_prefix = parse_result.path.strip("/") return self._current_query_execution_id def wait(self): """Wait for the current query to finish.""" self.sagemaker_session.wait_for_athena_query( query_execution_id=self._current_query_execution_id ) def get_query_execution(self) -> Dict[str, Any]: """Get execution status of the current query. Returns: Response dict from Athena. """ return self.sagemaker_session.get_query_execution( query_execution_id=self._current_query_execution_id ) def as_dataframe(self) -> DataFrame: """Download the result of the current query and load it into a DataFrame. Returns: A pandas DataFrame contains the query result. """ query_state = self.get_query_execution().get("QueryExecution").get("Status").get("State") if query_state != "SUCCEEDED": if query_state in ("QUEUED", "RUNNING"): raise RuntimeError( f"Current query {self._current_query_execution_id} is still being executed." ) raise RuntimeError(f"Failed to execute query {self._current_query_execution_id}") output_filename = os.path.join( tempfile.gettempdir(), f"{self._current_query_execution_id}.csv" ) self.sagemaker_session.download_athena_query_result( bucket=self._result_bucket, prefix=self._result_file_prefix, query_execution_id=self._current_query_execution_id, filename=output_filename, ) return pd.read_csv(output_filename, delimiter=",") @attr.s class IngestionManagerPandas: """Class to manage the multi-threaded data ingestion process. This class will manage the data ingestion process which is multi-threaded. Attributes: feature_group_name (str): name of the Feature Group. sagemaker_fs_runtime_client_config (Config): instance of the Config class for boto calls. sagemaker_session (Session): session instance to perform boto calls. data_frame (DataFrame): pandas DataFrame to be ingested to the given feature group. max_workers (int): number of threads to create. max_processes (int): number of processes to create. Each process spawns ``max_workers`` threads. profile_name (str): the profile credential should be used for ``PutRecord`` (default: None). """ feature_group_name: str = attr.ib() sagemaker_fs_runtime_client_config: Config = attr.ib(default=None) sagemaker_session: Session = attr.ib(default=None) max_workers: int = attr.ib(default=1) max_processes: int = attr.ib(default=1) profile_name: str = attr.ib(default=None) _async_result: AsyncResult = attr.ib(default=None) _processing_pool: ProcessingPool = attr.ib(default=None) _failed_indices: List[int] = attr.ib(factory=list) @staticmethod def _ingest_single_batch( data_frame: DataFrame, feature_group_name: str, client_config: Config, start_index: int, end_index: int, profile_name: str = None, ) -> List[int]: """Ingest a single batch of DataFrame rows into FeatureStore. Args: data_frame (DataFrame): source DataFrame to be ingested. feature_group_name (str): name of the Feature Group. client_config (Config): Configuration for the sagemaker feature store runtime client to perform boto calls. start_index (int): starting position to ingest in this batch. end_index (int): ending position to ingest in this batch. profile_name (str): the profile credential should be used for ``PutRecord`` (default: None). Returns: List of row indices that failed to be ingested. """ retry_config = client_config.retries if "max_attempts" not in retry_config and "total_max_attempts" not in retry_config: client_config = copy.deepcopy(client_config) client_config.retries = {"max_attempts": 10, "mode": "standard"} sagemaker_fs_runtime_client = boto3.Session(profile_name=profile_name).client( service_name="sagemaker-featurestore-runtime", config=client_config ) logger.info("Started ingesting index %d to %d", start_index, end_index) failed_rows = list() for row in data_frame[start_index:end_index].itertuples(): IngestionManagerPandas._ingest_row( data_frame=data_frame, row=row, feature_group_name=feature_group_name, sagemaker_fs_runtime_client=sagemaker_fs_runtime_client, failed_rows=failed_rows, ) return failed_rows @property def failed_rows(self) -> List[int]: """Get rows that failed to ingest. Returns: List of row indices that failed to be ingested. """ return self._failed_indices def wait(self, timeout=None): """Wait for the ingestion process to finish. Args: timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. """ try: results = self._async_result.get(timeout=timeout) except KeyboardInterrupt as i: # terminate workers abruptly on keyboard interrupt. self._processing_pool.terminate() self._processing_pool.close() self._processing_pool.clear() raise i else: # terminate normally self._processing_pool.close() self._processing_pool.clear() self._failed_indices = [ failed_index for failed_indices in results for failed_index in failed_indices ] if len(self._failed_indices) > 0: raise IngestionError( self._failed_indices, f"Failed to ingest some data into FeatureGroup {self.feature_group_name}", ) @staticmethod def _ingest_row( data_frame: DataFrame, row: int, feature_group_name: str, sagemaker_fs_runtime_client: Session, failed_rows: List[int], ): """Ingest a single Dataframe row into FeatureStore. Args: data_frame (DataFrame): source DataFrame to be ingested. row (int): current row that is being ingested feature_group_name (str): name of the Feature Group. sagemaker_featurestore_runtime_client (Session): session instance to perform boto calls. failed_rows (List[int]): list of indices from the data frame for which ingestion failed. Returns: int of row indices that failed to be ingested. """ record = [ FeatureValue( feature_name=data_frame.columns[index - 1], value_as_string=str(row[index]), ) for index in range(1, len(row)) if pd.notna(row[index]) ] try: sagemaker_fs_runtime_client.put_record( FeatureGroupName=feature_group_name, Record=[value.to_dict() for value in record], ) except Exception as e: # pylint: disable=broad-except logger.error("Failed to ingest row %d: %s", row[0], e) failed_rows.append(row[0]) def _run_single_process_single_thread(self, data_frame: DataFrame): """Ingest a utilizing single process and single thread. Args: data_frame (DataFrame): source DataFrame to be ingested. """ logger.info("Started ingesting index %d to %d") failed_rows = list() sagemaker_fs_runtime_client = self.sagemaker_session.sagemaker_featurestore_runtime_client for row in data_frame.itertuples(): IngestionManagerPandas._ingest_row( data_frame=data_frame, row=row, feature_group_name=self.feature_group_name, sagemaker_fs_runtime_client=sagemaker_fs_runtime_client, failed_rows=failed_rows, ) self._failed_indices = failed_rows if len(self._failed_indices) > 0: raise IngestionError( self._failed_indices, f"Failed to ingest some data into FeatureGroup {self.feature_group_name}", ) def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None): """Start the ingestion process with the specified number of processes. Args: data_frame (DataFrame): source DataFrame to be ingested. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. """ # pylint: disable=I1101 batch_size = math.ceil(data_frame.shape[0] / self.max_processes) # pylint: enable=I1101 args = [] for i in range(self.max_processes): start_index = min(i * batch_size, data_frame.shape[0]) end_index = min(i * batch_size + batch_size, data_frame.shape[0]) args += [ ( self.max_workers, self.feature_group_name, self.sagemaker_fs_runtime_client_config, data_frame[start_index:end_index], start_index, timeout, self.profile_name, ) ] def init_worker(): # ignore keyboard interrupts in child processes. signal.signal(signal.SIGINT, signal.SIG_IGN) self._processing_pool = ProcessingPool(self.max_processes, init_worker) self._processing_pool.restart(force=True) f = lambda x: IngestionManagerPandas._run_multi_threaded(*x) # noqa: E731 self._async_result = self._processing_pool.amap(f, args) if wait: self.wait(timeout=timeout) @staticmethod def _run_multi_threaded( max_workers: int, feature_group_name: str, sagemaker_fs_runtime_client_config: Config, data_frame: DataFrame, row_offset=0, timeout=None, profile_name=None, ) -> List[int]: """Start the ingestion process. Args: data_frame (DataFrame): source DataFrame to be ingested. row_offset (int): if ``data_frame`` is a partition of a parent DataFrame, then the index of the parent where ``data_frame`` starts. Otherwise, 0. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. profile_name (str): the profile credential should be used for ``PutRecord`` (default: None). Returns: List of row indices that failed to be ingested. """ executor = ThreadPoolExecutor(max_workers=max_workers) # pylint: disable=I1101 batch_size = math.ceil(data_frame.shape[0] / max_workers) # pylint: enable=I1101 futures = {} for i in range(max_workers): start_index = min(i * batch_size, data_frame.shape[0]) end_index = min(i * batch_size + batch_size, data_frame.shape[0]) futures[ executor.submit( IngestionManagerPandas._ingest_single_batch, feature_group_name=feature_group_name, data_frame=data_frame, start_index=start_index, end_index=end_index, client_config=sagemaker_fs_runtime_client_config, profile_name=profile_name, ) ] = (start_index + row_offset, end_index + row_offset) failed_indices = list() for future in as_completed(futures, timeout=timeout): start, end = futures[future] failed_rows = future.result() if not failed_rows: logger.info("Successfully ingested row %d to %d", start, end) failed_indices += failed_rows executor.shutdown(wait=False) return failed_indices def run(self, data_frame: DataFrame, wait=True, timeout=None): """Start the ingestion process. Args: data_frame (DataFrame): source DataFrame to be ingested. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. """ if self.max_workers == 1 and self.max_processes == 1 and self.profile_name is None: self._run_single_process_single_thread(data_frame=data_frame) else: self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout) class IngestionError(Exception): """Exception raised for errors during ingestion. Attributes: failed_rows: list of indices from the data frame for which ingestion failed. message: explanation of the error """ def __init__(self, failed_rows, message): super(IngestionError, self).__init__(message) self.failed_rows = failed_rows self.message = message def __str__(self) -> str: """String representation of the error.""" return f"{self.failed_rows} -> {self.message}" @attr.s class FeatureGroup: """FeatureGroup definition. This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup, session instance, and a list of feature definition objects i.e., FeatureDefinition. Attributes: name (str): name of the FeatureGroup instance. sagemaker_session (Session): session instance to perform boto calls. If None, a new Session will be created. feature_definitions (Sequence[FeatureDefinition]): list of FeatureDefinitions. """ name: str = attr.ib(factory=str) sagemaker_session: Session = attr.ib(factory=Session) feature_definitions: Sequence[FeatureDefinition] = attr.ib(factory=list) _INTEGER_TYPES = [ "int_", "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64", ] _FLOAT_TYPES = ["float_", "float16", "float32", "float64"] DTYPE_TO_FEATURE_DEFINITION_CLS_MAP: Dict[str, FeatureTypeEnum] = { type: FeatureTypeEnum.INTEGRAL for type in _INTEGER_TYPES } DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.update( {type: FeatureTypeEnum.FRACTIONAL for type in _FLOAT_TYPES} ) DTYPE_TO_FEATURE_DEFINITION_CLS_MAP["string"] = FeatureTypeEnum.STRING DTYPE_TO_FEATURE_DEFINITION_CLS_MAP["object"] = FeatureTypeEnum.STRING _FEATURE_TYPE_TO_DDL_DATA_TYPE_MAP = { FeatureTypeEnum.INTEGRAL.value: "INT", FeatureTypeEnum.FRACTIONAL.value: "FLOAT", FeatureTypeEnum.STRING.value: "STRING", } def create( self, s3_uri: Union[str, bool], record_identifier_name: str, event_time_feature_name: str, role_arn: str = None, online_store_kms_key_id: str = None, enable_online_store: bool = False, ttl_duration: TtlDuration = None, offline_store_kms_key_id: str = None, disable_glue_table_creation: bool = False, data_catalog_config: DataCatalogConfig = None, description: str = None, tags: List[Dict[str, str]] = None, table_format: TableFormatEnum = None, ) -> Dict[str, Any]: """Create a SageMaker FeatureStore FeatureGroup. Args: s3_uri (Union[str, bool]): S3 URI of the offline store, set to ``False`` to disable offline store. record_identifier_name (str): name of the record identifier feature. event_time_feature_name (str): name of the event time feature. role_arn (str): ARN of the role used to call CreateFeatureGroup. online_store_kms_key_id (str): KMS key ARN for online store (default: None). ttl_duration (TtlDuration): Default time to live duration for records (default: None). enable_online_store (bool): whether to enable online store or not (default: False). offline_store_kms_key_id (str): KMS key ARN for offline store (default: None). If a KMS encryption key is not specified, SageMaker encrypts all data at rest using the default AWS KMS key. By defining your bucket-level key for SSE, you can reduce the cost of AWS KMS requests. For more information, see `Bucket Key <https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucket-key.html>`_ in the Amazon S3 User Guide. disable_glue_table_creation (bool): whether to turn off Glue table creation or not (default: False). data_catalog_config (DataCatalogConfig): configuration for Metadata store (default: None). description (str): description of the FeatureGroup (default: None). tags (List[Dict[str, str]]): list of tags for labeling a FeatureGroup (default: None). table_format (TableFormatEnum): format of the offline store table (default: None). Returns: Response dict from service. """ role_arn = resolve_value_from_config( role_arn, FEATURE_GROUP_ROLE_ARN_PATH, sagemaker_session=self.sagemaker_session ) offline_store_kms_key_id = resolve_value_from_config( offline_store_kms_key_id, FEATURE_GROUP_OFFLINE_STORE_KMS_KEY_ID_PATH, sagemaker_session=self.sagemaker_session, ) online_store_kms_key_id = resolve_value_from_config( online_store_kms_key_id, FEATURE_GROUP_ONLINE_STORE_KMS_KEY_ID_PATH, sagemaker_session=self.sagemaker_session, ) if not role_arn: # Originally IAM role was a required parameter. # Now we marked that as Optional because we can fetch it from SageMakerConfig, # Because of marking that parameter as optional, we should validate if it is None, even # after fetching the config. raise ValueError("An AWS IAM role is required to create a Feature Group.") create_feature_store_args = dict( feature_group_name=self.name, record_identifier_name=record_identifier_name, event_time_feature_name=event_time_feature_name, feature_definitions=[ feature_definition.to_dict() for feature_definition in self.feature_definitions ], role_arn=role_arn, description=description, tags=tags, ) # online store configuration if enable_online_store: online_store_config = OnlineStoreConfig( enable_online_store=enable_online_store, ttl_duration=ttl_duration, ) if online_store_kms_key_id is not None: online_store_config.online_store_security_config = OnlineStoreSecurityConfig( kms_key_id=online_store_kms_key_id ) create_feature_store_args.update({"online_store_config": online_store_config.to_dict()}) # offline store configuration if s3_uri: s3_storage_config = S3StorageConfig(s3_uri=s3_uri) if offline_store_kms_key_id: s3_storage_config.kms_key_id = offline_store_kms_key_id offline_store_config = OfflineStoreConfig( s3_storage_config=s3_storage_config, disable_glue_table_creation=disable_glue_table_creation, data_catalog_config=data_catalog_config, table_format=table_format, ) create_feature_store_args.update( {"offline_store_config": offline_store_config.to_dict()} ) return self.sagemaker_session.create_feature_group(**create_feature_store_args) def delete(self): """Delete a FeatureGroup.""" self.sagemaker_session.delete_feature_group(feature_group_name=self.name) def describe(self, next_token: str = None) -> Dict[str, Any]: """Describe a FeatureGroup. Args: next_token (str): next_token to get next page of features. Returns: Response dict from the service. """ return self.sagemaker_session.describe_feature_group( feature_group_name=self.name, next_token=next_token ) def update( self, feature_additions: Sequence[FeatureDefinition] = None, online_store_config: OnlineStoreConfigUpdate = None, ) -> Dict[str, Any]: """Update a FeatureGroup and add new features from the given feature definitions. Args: feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. online_store_config (OnlineStoreConfigUpdate): online store config to be updated. Returns: Response dict from service. """ if feature_additions is None: feature_additions_parameter = None else: feature_additions_parameter = [ feature_addition.to_dict() for feature_addition in feature_additions ] if online_store_config is None: online_store_config_parameter = None else: online_store_config_parameter = online_store_config.to_dict() return self.sagemaker_session.update_feature_group( feature_group_name=self.name, feature_additions=feature_additions_parameter, online_store_config=online_store_config_parameter, ) def update_feature_metadata( self, feature_name: str, description: str = None, parameter_additions: Sequence[FeatureParameter] = None, parameter_removals: Sequence[str] = None, ) -> Dict[str, Any]: """Update a feature metadata and add/remove metadata. Args: feature_name (str): name of the feature to update. description (str): description of the feature to update. parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. parameter_removals (Sequence[str]): list of feature parameter key to be removed. Returns: Response dict from service. """ return self.sagemaker_session.update_feature_metadata( feature_group_name=self.name, feature_name=feature_name, description=description, parameter_additions=[ parameter_addition.to_dict() for parameter_addition in (parameter_additions or []) ], parameter_removals=(parameter_removals or []), ) def describe_feature_metadata(self, feature_name: str) -> Dict[str, Any]: """Describe feature metadata by feature name. Args: feature_name (str): name of the feature. Returns: Response dict from service. """ return self.sagemaker_session.describe_feature_metadata( feature_group_name=self.name, feature_name=feature_name ) def list_tags(self) -> Sequence[Dict[str, str]]: """List all tags for a feature group. Returns: list of key, value pair of the tags. """ feature_group_arn = self.sagemaker_session.describe_feature_group( feature_group_name=self.name ).get("FeatureGroupArn") return self.sagemaker_session.list_tags(resource_arn=feature_group_arn) def list_parameters_for_feature_metadata(self, feature_name: str) -> Sequence[Dict[str, str]]: """List all parameters for a feature metadata. Args: feature_name (str): name of the feature. Returns: list of key, value pair of the parameters. """ return self.sagemaker_session.describe_feature_metadata( feature_group_name=self.name, feature_name=feature_name ).get("Parameters") def load_feature_definitions( self, data_frame: DataFrame, ) -> Sequence[FeatureDefinition]: """Load feature definitions from a Pandas DataFrame. Column name is used as feature name. Feature type is inferred from the dtype of the column. Dtype int_, int8, int16, int32, int64, uint8, uint16, uint32 and uint64 are mapped to Integral feature type. Dtype float_, float16, float32 and float64 are mapped to Fractional feature type. string dtype is mapped to String feature type. No feature definitions will be loaded if the given data_frame contains unsupported dtypes. Args: data_frame (DataFrame): Returns: list of FeatureDefinition """ feature_definitions = [] for column in data_frame: feature_type = self.DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get( str(data_frame[column].dtype).lower(), None ) if feature_type: feature_definitions.append( FeatureDefinition(feature_name=column, feature_type=feature_type) ) else: raise ValueError( f"Failed to infer Feature type based on dtype {data_frame[column].dtype} " f"for column {column}." ) self.feature_definitions = feature_definitions return self.feature_definitions def get_record( self, record_identifier_value_as_string: str, feature_names: Sequence[str] = None, ) -> Sequence[Dict[str, str]]: """Get a single record in a FeatureGroup Args: record_identifier_value_as_string (String): a String representing the value of the record identifier. feature_names (Sequence[String]): a list of Strings representing feature names. """ return self.sagemaker_session.get_record( record_identifier_value_as_string=record_identifier_value_as_string, feature_group_name=self.name, feature_names=feature_names, ).get("Record") def put_record(self, record: Sequence[FeatureValue], ttl_duration: TtlDuration = None): """Put a single record in the FeatureGroup. Args: record (Sequence[FeatureValue]): a list contains feature values. ttl_duration (TtlDuration): customer specified ttl duration. """ if ttl_duration is not None: return self.sagemaker_session.put_record( feature_group_name=self.name, record=[value.to_dict() for value in record], ttl_duration=ttl_duration.to_dict(), ) return self.sagemaker_session.put_record( feature_group_name=self.name, record=[value.to_dict() for value in record], ) def delete_record( self, record_identifier_value_as_string: str, event_time: str, deletion_mode: DeletionModeEnum = DeletionModeEnum.SOFT_DELETE, ): """Delete a single record from a FeatureGroup. Args: record_identifier_value_as_string (String): a String representing the value of the record identifier. event_time (String): a timestamp format String indicating when the deletion event occurred. deletion_mode (DeletionModeEnum): deletion mode for deleting record. (default: DetectionModeEnum.SOFT_DELETE) """ return self.sagemaker_session.delete_record( feature_group_name=self.name, record_identifier_value_as_string=record_identifier_value_as_string, event_time=event_time, deletion_mode=deletion_mode.value, ) def ingest( self, data_frame: DataFrame, max_workers: int = 1, max_processes: int = 1, wait: bool = True, timeout: Union[int, float] = None, profile_name: str = None, ) -> IngestionManagerPandas: """Ingest the content of a pandas DataFrame to feature store. ``max_worker`` the number of threads created to work on different partitions of the ``data_frame`` in parallel. ``max_processes`` the number of processes will be created to work on different partitions of the ``data_frame`` in parallel, each with ``max_worker`` threads. The ingest function attempts to ingest all records in the data frame. SageMaker Feature Store throws an exception if it fails to ingest any records. If ``wait`` is ``True``, Feature Store runs the ``ingest`` function synchronously. You receive an ``IngestionError`` if there are any records that can't be ingested. If ``wait`` is ``False``, Feature Store runs the ``ingest`` function asynchronously. Instead of setting ``wait`` to ``True`` in the ``ingest`` function, you can invoke the ``wait`` function on the returned instance of ``IngestionManagerPandas`` to run the ``ingest`` function synchronously. To access the rows that failed to ingest, set ``wait`` to ``False``. The ``IngestionError.failed_rows`` object saves all of the rows that failed to ingest. `profile_name` argument is an optional one. It will use the default credential if None is passed. This `profile_name` is used in the sagemaker_featurestore_runtime client only. See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more about the default credential. Args: data_frame (DataFrame): data_frame to be ingested to feature store. max_workers (int): number of threads to be created. max_processes (int): number of processes to be created. Each process spawns ``max_worker`` number of threads. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. profile_name (str): the profile credential should be used for ``PutRecord`` (default: None). Returns: An instance of IngestionManagerPandas. """ if max_processes <= 0: raise RuntimeError("max_processes must be greater than 0.") if max_workers <= 0: raise RuntimeError("max_workers must be greater than 0.") if profile_name is None and self.sagemaker_session.boto_session.profile_name != "default": profile_name = self.sagemaker_session.boto_session.profile_name manager = IngestionManagerPandas( feature_group_name=self.name, sagemaker_session=self.sagemaker_session, sagemaker_fs_runtime_client_config=self.sagemaker_session.sagemaker_featurestore_runtime_client.meta.config, max_workers=max_workers, max_processes=max_processes, profile_name=profile_name, ) manager.run(data_frame=data_frame, wait=wait, timeout=timeout) return manager def athena_query(self) -> AthenaQuery: """Create an AthenaQuery instance. Returns: An instance of AthenaQuery initialized with data catalog configurations. """ response = self.describe() data_catalog_config = response.get("OfflineStoreConfig").get("DataCatalogConfig", None) disable_glue = data_catalog_config.get("DisableGlueTableCreation", False) if data_catalog_config: query = AthenaQuery( catalog=data_catalog_config["Catalog"] if disable_glue else "AwsDataCatalog", database=data_catalog_config["Database"], table_name=data_catalog_config["TableName"], sagemaker_session=self.sagemaker_session, ) return query raise RuntimeError("No metastore is configured with this feature group.") def as_hive_ddl(self, database: str = "sagemaker_featurestore", table_name: str = None) -> str: """Generate Hive DDL commands to define or change structure of tables or databases in Hive. Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type. Args: database: name of the database. If not set "sagemaker_featurestore" will be used. table_name: name of the table. If not set the name of this feature group will be used. Returns: Generated create table DDL string. """ if not table_name: table_name = self.name resolved_output_s3_uri = ( self.describe() .get("OfflineStoreConfig") .get("S3StorageConfig") .get("ResolvedOutputS3Uri") ) ddl = f"CREATE EXTERNAL TABLE IF NOT EXISTS {database}.{table_name} (\n" for definition in self.feature_definitions: ddl += ( f" {definition.feature_name} " f"{self._FEATURE_TYPE_TO_DDL_DATA_TYPE_MAP.get(definition.feature_type.value)}\n" ) ddl += " write_time TIMESTAMP\n" ddl += " event_time TIMESTAMP\n" ddl += " is_deleted BOOLEAN\n" ddl += ")\n" ddl += ( "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'\n" " STORED AS\n" " INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'\n" " OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'\n" f"LOCATION '{resolved_output_s3_uri}'" ) return ddl