# mypy: disable-error-code=name-defined
"""Amazon Neptune Module."""
import logging
import re
import time
from typing import Any, Callable, Dict, List, Literal, Optional, TypeVar, Union
import boto3
import awswrangler.neptune._gremlin_init as gremlin
import awswrangler.pandas as pd
from awswrangler import _utils, exceptions, s3
from awswrangler._config import apply_configs
from awswrangler.neptune._client import BulkLoadParserConfiguration, NeptuneClient
gremlin_python = _utils.import_optional_dependency("gremlin_python")
opencypher = _utils.import_optional_dependency("requests")
sparql = _utils.import_optional_dependency("SPARQLWrapper")
_logger: logging.Logger = logging.getLogger(__name__)
FuncT = TypeVar("FuncT", bound=Callable[..., Any])
@_utils.check_optional_dependency(gremlin_python, "gremlin_python")
def execute_gremlin(client: NeptuneClient, query: str) -> pd.DataFrame:
"""Return results of a Gremlin traversal as pandas DataFrame.
Parameters
----------
client: neptune.Client
instance of the neptune client to use
query: str
The gremlin traversal to execute
Returns
-------
pandas.DataFrame
Results as Pandas DataFrame
Examples
--------
Run a Gremlin Query
>>> import awswrangler as wr
>>> client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
>>> df = wr.neptune.execute_gremlin(client, "g.V().limit(1)")
"""
results = client.read_gremlin(query)
df = pd.DataFrame.from_records(results)
return df
@_utils.check_optional_dependency(opencypher, "opencypher")
def execute_opencypher(client: NeptuneClient, query: str) -> pd.DataFrame:
"""Return results of a openCypher traversal as pandas DataFrame.
Parameters
----------
client: NeptuneClient
instance of the neptune client to use
query: str
The openCypher query to execute
Returns
-------
pandas.DataFrame
Results as Pandas DataFrame
Examples
--------
Run an openCypher query
>>> import awswrangler as wr
>>> client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
>>> resp = wr.neptune.execute_opencypher(client, "MATCH (n) RETURN n LIMIT 1")
"""
resp = client.read_opencypher(query)
df = pd.DataFrame.from_dict(resp)
return df
@_utils.check_optional_dependency(sparql, "SPARQLWrapper")
def execute_sparql(client: NeptuneClient, query: str) -> pd.DataFrame:
"""Return results of a SPARQL query as pandas DataFrame.
Parameters
----------
client: NeptuneClient
instance of the neptune client to use
query: str
The SPARQL traversal to execute
Returns
-------
pandas.DataFrame
Results as Pandas DataFrame
Examples
--------
Run a SPARQL query
>>> import awswrangler as wr
>>> client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
>>> df = wr.neptune.execute_sparql(client, "PREFIX foaf:
SELECT ?name
WHERE {
?person foaf:name ?name .
"""
data = client.read_sparql(query)
df = None
if "results" in data and "bindings" in data["results"]:
df = pd.DataFrame(data["results"]["bindings"])
df.applymap(lambda x: x["value"])
else:
df = pd.DataFrame(data)
return df
@_utils.check_optional_dependency(gremlin_python, "gremlin_python")
def to_property_graph(
client: NeptuneClient, df: pd.DataFrame, batch_size: int = 50, use_header_cardinality: bool = True
) -> bool:
"""Write records stored in a DataFrame into Amazon Neptune.
If writing to a property graph then DataFrames for vertices and edges must be written separately.
DataFrames for vertices must have a ~label column with the label and a ~id column for the vertex id.
If the ~id column does not exist, the specified id does not exists, or is empty then a new vertex will be added.
If no ~label column exists an exception will be thrown.
DataFrames for edges must have a ~id, ~label, ~to, and ~from column. If the ~id column does not exist
the specified id does not exists, or is empty then a new edge will be added. If no ~label, ~to, or ~from column
exists an exception will be thrown.
If you would like to save data using `single` cardinality then you can postfix (single) to the column header and
set use_header_cardinality=True (default). e.g. A column named `name(single)` will save the `name` property
as single
cardinality. You can disable this by setting by setting `use_header_cardinality=False`.
Parameters
----------
client: NeptuneClient
instance of the neptune client to use
df: pandas.DataFrame
Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
batch_size: int
The number of rows to save at a time. Default 50
use_header_cardinality: bool
If True, then the header cardinality will be used to save the data. Default True
Returns
-------
bool
True if records were written
Examples
--------
Writing to Amazon Neptune
>>> import awswrangler as wr
>>> client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
>>> wr.neptune.gremlin.to_property_graph(
... df=df
... )
"""
# check if ~id and ~label column exist and if not throw error
g = gremlin.traversal().withGraph(gremlin.Graph())
is_edge_df = False
is_update_df = True
if "~id" in df.columns:
if "~label" in df.columns:
is_update_df = False
if "~to" in df.columns and "~from" in df.columns:
is_edge_df = True
else:
raise exceptions.InvalidArgumentValue(
"DataFrame must contain at least a ~id and a ~label column to be saved to Amazon Neptune"
)
# Loop through items in the DF
for index, row in df.iterrows():
# build up a query
if is_update_df:
g = _build_gremlin_update(g, row, use_header_cardinality)
elif is_edge_df:
g = _build_gremlin_insert_edges(g, row.to_dict(), use_header_cardinality)
else:
g = _build_gremlin_insert_vertices(g, row.to_dict(), use_header_cardinality)
# run the query
if index > 0 and index % batch_size == 0:
res = _run_gremlin_insert(client, g)
if res:
g = gremlin.Graph().traversal()
return _run_gremlin_insert(client, g)
@_utils.check_optional_dependency(sparql, "SPARQLWrapper")
def to_rdf_graph(
client: NeptuneClient,
df: pd.DataFrame,
batch_size: int = 50,
subject_column: str = "s",
predicate_column: str = "p",
object_column: str = "o",
graph_column: str = "g",
) -> bool:
"""Write records stored in a DataFrame into Amazon Neptune.
The DataFrame must consist of triples with column names for the subject, predicate, and object specified.
If you want to add data into a named graph then you will also need the graph column.
Parameters
----------
client (NeptuneClient) :
instance of the neptune client to use
df (pandas.DataFrame) :
Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
subject_column (str, optional) :
The column name in the DataFrame for the subject. Defaults to 's'
predicate_column (str, optional) :
The column name in the DataFrame for the predicate. Defaults to 'p'
object_column (str, optional) :
The column name in the DataFrame for the object. Defaults to 'o'
graph_column (str, optional) :
The column name in the DataFrame for the graph if sending across quads. Defaults to 'g'
Returns
-------
bool
True if records were written
Examples
--------
Writing to Amazon Neptune
>>> import awswrangler as wr
>>> client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
>>> wr.neptune.gremlin.to_rdf_graph(
... df=df
... )
"""
is_quads = False
if pd.Series([subject_column, object_column, predicate_column]).isin(df.columns).all():
if graph_column in df.columns:
is_quads = True
else:
raise exceptions.InvalidArgumentValue(
"""DataFrame must contain at least the subject, predicate, and object columns defined or the defaults
(s, p, o) to be saved to Amazon Neptune"""
)
query = ""
# Loop through items in the DF
for index, row in df.iterrows():
# build up a query
if is_quads:
insert = f"""INSERT DATA {{ GRAPH <{row[graph_column]}> {{<{row[subject_column]}>
<{str(row[predicate_column])}> <{row[object_column]}> . }} }}; """
query = query + insert
else:
insert = f"""INSERT DATA {{ <{row[subject_column]}> <{str(row[predicate_column])}>
<{row[object_column]}> . }}; """
query = query + insert
# run the query
if index > 0 and index % batch_size == 0:
res = client.write_sparql(query)
if res:
query = ""
return client.write_sparql(query)
BULK_LOAD_IN_PROGRESS_STATES = {"LOAD_IN_QUEUE", "LOAD_NOT_STARTED", "LOAD_IN_PROGRESS"}
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
)
@apply_configs
@_utils.check_optional_dependency(sparql, "SPARQLWrapper")
def bulk_load(
client: NeptuneClient,
df: pd.DataFrame,
path: str,
iam_role: str,
neptune_load_wait_polling_delay: float = 0.25,
load_parallelism: Literal["LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"] = "HIGH",
parser_configuration: Optional[BulkLoadParserConfiguration] = None,
update_single_cardinality_properties: Literal["TRUE", "FALSE"] = "FALSE",
queue_request: Literal["TRUE", "FALSE"] = "FALSE",
dependencies: Optional[List[str]] = None,
keep_files: bool = False,
use_threads: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
) -> None:
"""
Write records into Amazon Neptune using the Neptune Bulk Loader.
The DataFrame will be written to S3 and then loaded to Neptune using the
`Bulk Loader `_.
Parameters
----------
client: NeptuneClient
Instance of the neptune client to use
df: DataFrame, optional
`Pandas DataFrame `_ to write to Neptune.
path: str
S3 Path that the Neptune Bulk Loader will load data from.
iam_role: str
The Amazon Resource Name (ARN) for an IAM role to be assumed by the Neptune DB instance for access to the S3 bucket.
For information about creating a role that has access to Amazon S3 and then associating it with a Neptune cluster,
see `Prerequisites: IAM Role and Amazon S3 Access `_.
neptune_load_wait_polling_delay: float
Interval in seconds for how often the function will check if the Neptune bulk load has completed.
load_parallelism: str
Specifies the number of threads used by Neptune's bulk load process.
parser_configuration: dict[str, Any], optional
An optional object with additional parser configuration values.
Each of the child parameters is also optional: ``namedGraphUri``, ``baseUri`` and ``allowEmptyStrings``.
update_single_cardinality_properties: str
An optional parameter that controls how the bulk loader
treats a new value for single-cardinality vertex or edge properties.
queue_request: str
An optional flag parameter that indicates whether the load request can be queued up or not.
If omitted or set to ``"FALSE"``, the load request will fail if another load job is already running.
dependencies: list[str], optional
An optional parameter that can make a queued load request contingent on the successful completion of one or more previous jobs in the queue.
keep_files: bool
Whether to keep stage files or delete them. False by default.
use_threads: bool | int
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
If integer is provided, specified number is used.
boto3_session: boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
s3_additional_kwargs: Dict[str, str], optional
Forwarded to botocore requests.
e.g. ``s3_additional_kwargs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'YOUR_KMS_KEY_ARN'}``
Examples
--------
>>> import awswrangler as wr
>>> import pandas as pd
>>> client = wr.neptune.connect("MY_NEPTUNE_ENDPOINT", 8182)
>>> frame = pd.DataFrame([{"~id": "0", "~labels": ["version"], "~properties": {"type": "version"}}])
>>> wr.neptune.bulk_load(
... client=client,
... df=frame,
... path="s3://my-bucket/stage-files/",
... iam_role="arn:aws:iam::XXX:role/XXX"
... )
"""
path = path[:-1] if path.endswith("*") else path
path = path if path.endswith("/") else f"{path}/"
if s3.list_objects(path=path, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs):
raise exceptions.InvalidArgument(
f"The received S3 path ({path}) is not empty. "
"Please, provide a different path or use wr.s3.delete_objects() to clean up the current one."
)
try:
s3.to_csv(df, path, use_threads=use_threads, dataset=True, index=False)
bulk_load_from_files(
client=client,
path=path,
iam_role=iam_role,
format="csv",
neptune_load_wait_polling_delay=neptune_load_wait_polling_delay,
load_parallelism=load_parallelism,
parser_configuration=parser_configuration,
update_single_cardinality_properties=update_single_cardinality_properties,
queue_request=queue_request,
dependencies=dependencies,
)
finally:
if keep_files is False:
_logger.debug("Deleting objects in S3 path: %s", path)
s3.delete_objects(
path=path,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
@apply_configs
@_utils.check_optional_dependency(sparql, "SPARQLWrapper")
def bulk_load_from_files(
client: NeptuneClient,
path: str,
iam_role: str,
format: Literal["csv", "opencypher", "ntriples", "nquads", "rdfxml", "turtle"] = "csv",
neptune_load_wait_polling_delay: float = 0.25,
load_parallelism: Literal["LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"] = "HIGH",
parser_configuration: Optional[BulkLoadParserConfiguration] = None,
update_single_cardinality_properties: Literal["TRUE", "FALSE"] = "FALSE",
queue_request: Literal["TRUE", "FALSE"] = "FALSE",
dependencies: Optional[List[str]] = None,
) -> None:
"""
Load files from S3 into Amazon Neptune using the Neptune Bulk Loader.
For more information about the Bulk Loader see
`here `_.
Parameters
----------
client: NeptuneClient
Instance of the neptune client to use
path: str
S3 Path that the Neptune Bulk Loader will load data from.
iam_role: str
The Amazon Resource Name (ARN) for an IAM role to be assumed by the Neptune DB instance for access to the S3 bucket.
For information about creating a role that has access to Amazon S3 and then associating it with a Neptune cluster,
see `Prerequisites: IAM Role and Amazon S3 Access `_.
format: str
The format of the data.
neptune_load_wait_polling_delay: float
Interval in seconds for how often the function will check if the Neptune bulk load has completed.
load_parallelism: str
Specifies the number of threads used by Neptune's bulk load process.
parser_configuration: dict[str, Any], optional
An optional object with additional parser configuration values.
Each of the child parameters is also optional: ``namedGraphUri``, ``baseUri`` and ``allowEmptyStrings``.
update_single_cardinality_properties: str
An optional parameter that controls how the bulk loader
treats a new value for single-cardinality vertex or edge properties.
queue_request: str
An optional flag parameter that indicates whether the load request can be queued up or not.
If omitted or set to ``"FALSE"``, the load request will fail if another load job is already running.
dependencies: list[str], optional
An optional parameter that can make a queued load request contingent on the successful completion of one or more previous jobs in the queue.
Examples
--------
>>> import awswrangler as wr
>>> client = wr.neptune.connect("MY_NEPTUNE_ENDPOINT", 8182)
>>> wr.neptune.bulk_load_from_files(
... client=client,
... path="s3://my-bucket/stage-files/",
... iam_role="arn:aws:iam::XXX:role/XXX",
... format="csv",
... )
"""
_logger.debug("Starting Neptune Bulk Load from %s", path)
load_id = client.load(
path,
iam_role,
format=format,
parallelism=load_parallelism,
parser_configuration=parser_configuration,
update_single_cardinality_properties=update_single_cardinality_properties,
queue_request=queue_request,
dependencies=dependencies,
)
while True:
status_response = client.load_status(load_id)
status: str = status_response["payload"]["overallStatus"]["status"]
if status == "LOAD_COMPLETED":
break
if status not in BULK_LOAD_IN_PROGRESS_STATES:
raise exceptions.NeptuneLoadError(f"Load {load_id} failed with {status}: {status_response}")
time.sleep(neptune_load_wait_polling_delay)
_logger.debug("Neptune load %s has succeeded in loading %s data from %s", load_id, format, path)
def connect(host: str, port: int, iam_enabled: bool = False, **kwargs: Any) -> NeptuneClient:
"""Create a connection to a Neptune cluster.
Parameters
----------
host: str
The host endpoint to connect to
port: int
The port endpoint to connect to
iam_enabled: bool, optional
True if IAM is enabled on the cluster. Defaults to False.
Returns
-------
NeptuneClient
[description]
"""
return NeptuneClient(host, port, iam_enabled, **kwargs)
def _get_column_name(column: str) -> str:
if "(single)" in column.lower():
return re.compile(r"\(single\)", re.IGNORECASE).sub("", column)
return column
def _set_properties(
g: "gremlin.GraphTraversalSource",
use_header_cardinality: bool,
row: Any,
ignore_cardinality: bool = False,
) -> "gremlin.GraphTraversalSource":
for column, value in row.items():
if column not in ["~id", "~label", "~to", "~from"]:
if ignore_cardinality and pd.notna(value):
g = g.property(_get_column_name(column), value)
elif use_header_cardinality:
# If the column header is specifying the cardinality then use it
if column.lower().find("(single)") > 0 and pd.notna(value):
g = g.property(gremlin.Cardinality.single, _get_column_name(column), value)
else:
g = _expand_properties(g, _get_column_name(column), value)
else:
# If not using header cardinality then use the default of set
g = _expand_properties(g, column, value)
return g
def _expand_properties(g: "gremlin.GraphTraversalSource", column: str, value: Any) -> "gremlin.GraphTraversalSource":
# If this is a list then expand it out into multiple property calls
if isinstance(value, list) and len(value) > 0:
for item in value:
g = g.property(gremlin.Cardinality.set_, column, item)
elif pd.notna(value):
g = g.property(gremlin.Cardinality.set_, column, value)
return g
def _build_gremlin_update(
g: "gremlin.GraphTraversalSource", row: Any, use_header_cardinality: bool
) -> "gremlin.GraphTraversalSource":
g = g.V(str(row["~id"]))
g = _set_properties(g, use_header_cardinality, row)
return g
def _build_gremlin_insert_vertices(
g: "gremlin.GraphTraversalSource", row: Any, use_header_cardinality: bool = False
) -> "gremlin.GraphTraversalSource":
g = (
g.V(str(row["~id"]))
.fold()
.coalesce(
gremlin.__.unfold(),
gremlin.__.addV(row["~label"]).property(gremlin.T.id, str(row["~id"])),
)
)
g = _set_properties(g, use_header_cardinality, row)
return g
def _build_gremlin_insert_edges(
g: "gremlin.GraphTraversalSource", row: pd.Series, use_header_cardinality: bool
) -> "gremlin.GraphTraversalSource":
g = (
g.V(str(row["~from"]))
.fold()
.coalesce(
gremlin.__.unfold(),
_build_gremlin_insert_vertices(gremlin.__, {"~id": row["~from"], "~label": "Vertex"}),
)
.addE(row["~label"])
.property(gremlin.T.id, str(row["~id"]))
.to(
gremlin.__.V(str(row["~to"]))
.fold()
.coalesce(
gremlin.__.unfold(),
_build_gremlin_insert_vertices(gremlin.__, {"~id": row["~to"], "~label": "Vertex"}),
)
)
)
g = _set_properties(g, use_header_cardinality, row, ignore_cardinality=True)
return g
def _run_gremlin_insert(client: NeptuneClient, g: "gremlin.GraphTraversalSource") -> bool:
translator = gremlin.Translator("g")
s = translator.translate(g.bytecode)
s = s.replace("Cardinality.", "") # hack to fix parser error for set cardinality
s = s.replace(
".values('shape')", ""
) # hack to fix parser error for adding unknown values('shape') steps to translation.
_logger.debug(s)
res = client.write_gremlin(s)
return res
def flatten_nested_df(
df: pd.DataFrame, include_prefix: bool = True, separator: str = "_", recursive: bool = True
) -> pd.DataFrame:
"""Flatten the lists and dictionaries of the input data frame.
Parameters
----------
df: pd.DataFrame
The input data frame
include_prefix: bool, optional
If True, then it will prefix the new column name with the original column name.
Defaults to True.
separator: str, optional
The separator to use between field names when a dictionary is exploded.
Defaults to "_".
recursive: bool, optional
If True, then this will recurse the fields in the data frame. Defaults to True.
Returns
-------
pd.DataFrame: The flattened data frame
"""
if separator is None:
separator = "_"
df = df.reset_index()
# search for list and map
s = (df.applymap(type) == list).all()
list_columns = s[s].index.tolist()
s = (df.applymap(type) == dict).all()
dict_columns = s[s].index.tolist()
if len(list_columns) > 0 or len(dict_columns) > 0:
new_columns = []
for col in dict_columns:
# expand dictionaries horizontally
expanded = None
if include_prefix:
expanded = pd.json_normalize(df[col], sep=separator).add_prefix(f"{col}{separator}")
else:
expanded = pd.json_normalize(df[col], sep=separator).add_prefix(f"{separator}")
expanded.index = df.index
df = pd.concat([df, expanded], axis=1).drop(columns=[col])
new_columns.extend(expanded.columns)
for col in list_columns:
df = df.drop(columns=[col]).join(df[col].explode().to_frame())
new_columns.append(col)
# check if there are still dict o list fields to flatten
s = (df[new_columns].applymap(type) == list).all()
list_columns = s[s].index.tolist()
s = (df[new_columns].applymap(type) == dict).all()
dict_columns = s[s].index.tolist()
if recursive and (len(list_columns) > 0 or len(dict_columns) > 0):
df = flatten_nested_df(df, include_prefix=include_prefix, separator=separator, recursive=recursive)
return df