{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%glue_ray\n", "%session_id_prefix tensorflow-ray\n", "%additional_python_modules ray[ml],tensorflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%min_workers 1\n", "%number_of_workers 2\n", "%object_memory_worker 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "from ray.air import session\n", "from ray.air.callbacks.keras import Callback\n", "from ray.train.tensorflow import prepare_dataset_shard\n", "from ray.train.tensorflow import TensorflowTrainer\n", "from ray.air.config import ScalingConfig\n", "from ray.air.config import RunConfig\n", "\n", "\n", "def build_model() -> tf.keras.Model:\n", " model = tf.keras.Sequential(\n", " [\n", " tf.keras.layers.InputLayer(input_shape=()),\n", " # Add feature dimension, expanding (batch_size,) to (batch_size, 1).\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(10),\n", " tf.keras.layers.Dense(1),\n", " ]\n", " )\n", " return model\n", "\n", "\n", "def train_func(config: dict):\n", " batch_size = config.get(\"batch_size\", 64)\n", " epochs = config.get(\"epochs\", 3)\n", "\n", " strategy = tf.distribute.MultiWorkerMirroredStrategy()\n", " with strategy.scope():\n", " # Model building/compiling need to be within `strategy.scope()`.\n", " multi_worker_model = build_model()\n", " multi_worker_model.compile(\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=config.get(\"lr\", 1e-3)),\n", " loss=tf.keras.losses.mean_squared_error,\n", " metrics=[tf.keras.metrics.mean_squared_error],\n", " )\n", "\n", " dataset = session.get_dataset_shard(\"train\")\n", "\n", " def to_tf_dataset(dataset, batch_size):\n", " def to_tensor_iterator():\n", " for batch in dataset.iter_tf_batches(\n", " batch_size=batch_size, dtypes=tf.float32\n", " ):\n", " yield batch[\"x\"], batch[\"y\"]\n", "\n", " output_signature = (\n", " tf.TensorSpec(shape=(None), dtype=tf.float32),\n", " tf.TensorSpec(shape=(None), dtype=tf.float32),\n", " )\n", " tf_dataset = tf.data.Dataset.from_generator(\n", " to_tensor_iterator, output_signature=output_signature\n", " )\n", " return prepare_dataset_shard(tf_dataset)\n", "\n", " results = []\n", " for _ in range(epochs):\n", " tf_dataset = to_tf_dataset(dataset=dataset, batch_size=batch_size)\n", " history = multi_worker_model.fit(tf_dataset, callbacks=[Callback()])\n", " results.append(history.history)\n", " return results\n", "\n", "\n", "num_workers = 2\n", "use_gpu = False\n", "\n", "config = {\"lr\": 1e-3, \"batch_size\": 32, \"epochs\": 4}\n", "\n", "train_dataset = ray.data.from_items([{\"x\": x, \"y\": 2 * x + 1} for x in range(200)])\n", "\n", "trainer = TensorflowTrainer(\n", " train_loop_per_worker=train_func,\n", " train_loop_config=config,\n", " scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),\n", " run_config=RunConfig(local_dir=\"/tmp/ray_results\"),\n", " datasets={\"train\": train_dataset},\n", ")\n", "result = trainer.fit()\n", "print(result.metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "kernelspec": { "display_name": "Glue Python [PySpark and Ray] (SparkAnalytics 1.0)", "language": "python", "name": "conda-env-sm_glue_is-glue_pyspark__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkanalytics-v1" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "Python_Glue_Session", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }