{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Use Spark UDF with Spark SQL and Spark DataFrame\n", "\n", "#### Topics covered in this example\n", "* Creating and registering UDF.\n", "* Special handling and best practices for UDF." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* The EMR cluster attached to this notebook should have the `Spark` application installed.\n", "* This notebook uses the `Spark` kernel.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "User-Defined Functions (UDFs) are user-programmable routines that act on one row. This example demonstrate how to define and register UDFs and invoke them in SQL using the `%%sql` magic.\n", "\n", "#### Why do you need UDFs ?\n", "Spark stores data in dataframes or RDDs—resilient distributed datasets. As with a traditional SQL database, you cannot create your own custom function and run that against the database directly unless you register the function first. That is, save it to the database as if it were one of the built-in database functions, like sum(), average(), count(), etc.\n", "\n", "The document: Scala UDF provides detailed information.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a spark dataFrame." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import spark.implicits._\n", "\n", "val columns = Seq(\"No\", \"Name\")\n", "\n", "val data = Seq((\"1\", \"john jones\"),\n", " (\"2\", \"tracey smith\"),\n", " (\"3\", \"amy sanders\"))\n", "\n", "val df = data.toDF(columns:_*)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print the table dataFrame." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.show(false)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Spark UDF to use it on DataFrame\n", "\n", "Create a function to convert a string to camel case. The function takes a string parameter and converts the first letter of every word to upper case letter." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val convertToCamelCase = (str:String) => {\n", " val arr = str.split(\" \")\n", " arr.map(f => f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(\" \")\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now convert this function `convertToCamelCase()` to a UDF by passing the function to Spark SQL `udf()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val convertToCamelCaseUDF = udf(convertToCamelCase)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now you can use the `convertToCamelCaseUDF()` on a DataFrame column." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.select(col(\"No\"), convertToCamelCaseUDF(col(\"Name\")).as(\"Name\") ).show(false)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Registering Spark UDF to use it on SQL\n", "\n", "In order to use a function on Spark SQL, you need to register the function with Spark using `spark.udf.register()`.\n", "\n", "Create another function convertToUpperCase to convert the whole string to upper case and register as udf." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val convertToUpperCase = (str:String) => {\n", " str.toUpperCase()\n", "}\n", "\n", "spark.udf.register(\"convertToUpperCaseUDF\", convertToUpperCase)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a temp view named `NAME_TABLE`. `createOrReplaceTempView` creates (or replaces if that view name already exists) a lazily evaluated view that you can then use like a hive table in Spark SQL. It is not persistent at this moment but you can run SQL queries on top of the view." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView(\"NAME_TABLE\")\n", "spark.sql(\"select No, convertToUpperCaseUDF(Name) from NAME_TABLE\").show(false)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also use the `%%sql` magic to query. Use the `convertToUpperCaseUDF` udf and display a new column `UpperCaseName`." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "select No, Name, convertToUpperCaseUDF(Name) as UpperCaseName from NAME_TABLE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Special Handling" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Execution order\n", "Spark SQL (including SQL and the DataFrame and Dataset API) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.\n", "\n", "Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. For example,\n", "\n", "```\n", "%%sql\n", "SELECT No, convertToUpperCaseUDF(Name) as Name from NAME_TABLE WHERE Name is not null and convertToUpperCaseUDF(Name) like \"%John%\"\n", "```\n", "\n", "This WHERE clause does not guarantee the `convertToUpperCaseUDF` to be invoked after filtering out nulls." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Handling null check\n", "UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records.\n", "\n", "```\n", "columns = [\"No\",\"Name\"]\n", "\n", "data = [(\"1\", \"john jones\"),\n", " (\"2\", \"tracey smith\"),\n", " (\"3\", \"amy sanders\"),\n", " (\"4\", null)]\n", "\n", "%%sql\n", "select No, convertToUpperCaseUDF(Name) as Name from NAME_TABLE\n", "```\n", "\n", "Record with `No 4` has value `null` for the `Name` column. Since we are not handling null with UDF function, using this on DataFrame returns an error." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Performance\n", "\n", "UDF’s are a black box and hence optimizations can’t be applied on the Dataframe/Dataset. When possible you should use the Spark SQL built-in functions as these functions provide optimizations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "### Best practices while using UDFs.\n", "\n", "* Make the UDF null-aware and do null checking inside the UDF itself.\n", "* Use `IF` or `CASE WHEN` expressions to do the null check and invoke the UDF in a conditional branch.\n", "* Create UDF only when existing built-in SQL function are insufficient." ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }