apiVersion: argoproj.io/v1alpha1 kind: WorkflowTemplate metadata: name: spark-template namespace: spark spec: templates: - name: smallJob retryStrategy: limit: 3 retryPolicy: "Always" inputs: # override defaults here parameters: - name: jobId - name: configUri - name: image value: ghcr.io/tripl-ai/arc:latest - name: pullPolicy value: "Always" - name: executorInstances value: "1" - name: executorCores value: "1" - name: executorMemory value: "1" - name: sparkConf value: "" - name: tags value: "" - name: parameters value: "" # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env. - name: environment value: test metadata: labels: app: spark workflowId: "{{workflow.uid}}" script: resources: limits: cpu: "1" memory: "1Gi" image: "{{inputs.parameters.image}}" command: ["/bin/sh"] source: | # verbose logging set -ex # print current hostname and ip hostname hostname -I # submit job /opt/spark/bin/spark-submit \ --master k8s://kubernetes.default.svc:443 \ --deploy-mode client \ --class ai.tripl.arc.ARC \ --name arc \ --conf spark.authenticate=true \ --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.driver.host=$(hostname -I) \ --conf spark.driver.memory=921m \ --conf spark.executor.cores={{inputs.parameters.executorCores}} \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.instances={{inputs.parameters.executorInstances}} \ --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \ --conf spark.io.encryption.enabled=true \ --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \ --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \ --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \ --conf spark.kubernetes.container.image={{inputs.parameters.image}} \ --conf spark.kubernetes.driver.limit.cores=1 \ --conf spark.kubernetes.driver.pod.name=$(hostname) \ --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \ --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \ --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.local.dirs.tmpfs=true \ --conf spark.kubernetes.namespace={{workflow.namespace}} \ --conf spark.network.crypto.enabled=true \ --conf spark.sql.ansi.enabled=true \ {{inputs.parameters.sparkConf}} \ local:///opt/spark/jars/arc.jar \ --etl.config.uri={{inputs.parameters.configUri}} \ --etl.config.job.id={{inputs.parameters.jobId}} \ --etl.config.environment={{inputs.parameters.environment}} \ --etl.config.ignoreEnvironments=false \ --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \ --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \ {{inputs.parameters.parameters}} - name: mediumJob retryStrategy: limit: 3 retryPolicy: "Always" inputs: # override defaults here parameters: - name: jobId - name: configUri - name: image value: ghcr.io/tripl-ai/arc:latest - name: pullPolicy value: "Always" - name: executorInstances value: "2" - name: executorCores value: "2" - name: executorMemory value: "10" - name: sparkConf value: "" - name: tags value: "" - name: parameters value: "" # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env. - name: environment value: test metadata: labels: app: spark workflowId: "{{workflow.uid}}" script: resources: limits: cpu: "2" memory: "13Gi" image: "{{inputs.parameters.image}}" command: ["/bin/sh"] source: | # verbose logging set -ex # print current hostname and ip hostname hostname -I # submit job /opt/spark/bin/spark-submit \ --master k8s://kubernetes.default.svc:443 \ --deploy-mode client \ --class ai.tripl.arc.ARC \ --name arc \ --conf spark.authenticate=true \ --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.driver.host=$(hostname -I) \ --conf spark.driver.memory=2g \ --conf spark.executor.cores={{inputs.parameters.executorCores}} \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.instances={{inputs.parameters.executorInstances}} \ --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \ --conf spark.io.encryption.enabled=true \ --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \ --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \ --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \ --conf spark.kubernetes.container.image={{inputs.parameters.image}} \ --conf spark.kubernetes.driver.limit.cores=1 \ --conf spark.kubernetes.driver.pod.name=$(hostname) \ --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \ --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \ --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.local.dirs.tmpfs=true \ --conf spark.kubernetes.namespace={{workflow.namespace}} \ --conf spark.network.crypto.enabled=true \ --conf spark.sql.ansi.enabled=true \ {{inputs.parameters.sparkConf}} \ local:///opt/spark/jars/arc.jar \ --etl.config.uri={{inputs.parameters.configUri}} \ --etl.config.job.id={{inputs.parameters.jobId}} \ --etl.config.environment={{inputs.parameters.environment}} \ --etl.config.ignoreEnvironments=false \ --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \ --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \ {{inputs.parameters.parameters}} - name: largeJob retryStrategy: limit: 3 retryPolicy: "Always" inputs: # override defaults here parameters: - name: jobId - name: configUri - name: image value: ghcr.io/tripl-ai/arc:latest - name: pullPolicy value: "Always" - name: executorInstances value: "3" - name: executorCores value: "2" - name: executorMemory value: "12" - name: sparkConf value: "" - name: tags value: "" - name: parameters value: "" # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env. - name: environment value: test metadata: labels: app: spark workflowId: "{{workflow.uid}}" script: resources: limits: cpu: "3" memory: "13Gi" image: "{{inputs.parameters.image}}" command: ["/bin/sh"] source: | # verbose logging set -ex # print current hostname and ip hostname hostname -I # submit job /opt/spark/bin/spark-submit \ --master k8s://kubernetes.default.svc:443 \ --deploy-mode client \ --class ai.tripl.arc.ARC \ --name arc \ --conf spark.authenticate=true \ --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.driver.host=$(hostname -I) \ --conf spark.driver.memory=4g \ --conf spark.executor.cores={{inputs.parameters.executorCores}} \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.instances={{inputs.parameters.executorInstances}} \ --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \ --conf spark.io.encryption.enabled=true \ --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \ --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \ --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \ --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \ --conf spark.kubernetes.container.image={{inputs.parameters.image}} \ --conf spark.kubernetes.driver.limit.cores=1 \ --conf spark.kubernetes.driver.pod.name=$(hostname) \ --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \ --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \ --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \ --conf spark.kubernetes.local.dirs.tmpfs=true \ --conf spark.kubernetes.namespace={{workflow.namespace}} \ --conf spark.network.crypto.enabled=true \ --conf spark.sql.ansi.enabled=true \ {{inputs.parameters.sparkConf}} \ local:///opt/spark/jars/arc.jar \ --etl.config.uri={{inputs.parameters.configUri}} \ --etl.config.job.id={{inputs.parameters.jobId}} \ --etl.config.environment={{inputs.parameters.environment}} \ --etl.config.ignoreEnvironments=false \ --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \ --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \ {{inputs.parameters.parameters}} - name: sparkLocal inputs: retryStrategy: limit: 3 retryPolicy: "Always" # override defaults here parameters: - name: jobId - name: configUri - name: image value: ghcr.io/tripl-ai/arc:latest - name: executorInstances value: "1" - name: executorCores value: "1" - name: executorMemory value: "1" - name: sparkConf value: "" - name: tags value: "" - name: parameters value: "" - name: pullPolicy value: IfNotPresent - name: environment value: test metadata: labels: app: spark workflowId: "{{workflow.uid}}" podSpecPatch: | containers: - name: main resources: requests: cpu: "{{inputs.parameters.executorCores}}" memory: "{{inputs.parameters.executorMemory}}Gi" script: image: "{{inputs.parameters.image}}" command: ["/bin/sh"] source: | # verbose logging set -ex # print current hostname and ip hostname hostname -I # submit job # driver memory is set at 90% of executorMemory /opt/spark/bin/spark-submit \ --master local[{{inputs.parameters.executorCores}}] \ --driver-memory $(({{inputs.parameters.executorMemory}} * 1024 * 90/100))m \ --driver-java-options "-XX:+UseG1GC" \ --class ai.tripl.arc.ARC \ --name arc \ --conf spark.driver.host=$(hostname -I) \ --conf spark.driver.pod.name=$(hostname)-driver \ --conf spark.io.encryption.enabled=true \ --conf spark.sql.adaptive.enabled=true \ --conf spark.network.crypto.enabled=true \ --conf spark.ui.enabled=true \ --conf spark.sql.ansi.enabled=true \ {{inputs.parameters.sparkConf}} \ local:///opt/spark/jars/arc.jar \ --etl.config.uri={{inputs.parameters.configUri}} \ --etl.config.job.id={{inputs.parameters.jobId}} \ --etl.config.environment={{inputs.parameters.environment}} \ --etl.config.ignoreEnvironments=fales \ --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \ --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \ {{inputs.parameters.parameters}}