apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: spark-template
  namespace: spark
spec:
  templates:
  - name: smallJob
    retryStrategy:
      limit: 3
      retryPolicy: "Always"
    inputs:
      # override defaults here
      parameters:
      - name: jobId
      - name: configUri
      - name: image
        value: ghcr.io/tripl-ai/arc:latest
      - name: pullPolicy
        value: "Always"    
      - name: executorInstances
        value: "1"
      - name: executorCores
        value: "1"
      - name: executorMemory
        value: "1"
      - name: sparkConf
        value: ""
      - name: tags
        value: ""
      - name: parameters
        value: ""
      # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env.   
      - name: environment
        value: test  
    metadata:
      labels:
          app: spark
          workflowId: "{{workflow.uid}}"
    script:
      resources:
        limits:
          cpu: "1"
          memory: "1Gi"
      image: "{{inputs.parameters.image}}"
      command: ["/bin/sh"]
      source: |
        # verbose logging
        set -ex

        # print current hostname and ip
        hostname
        hostname -I

        # submit job
        /opt/spark/bin/spark-submit \
        --master k8s://kubernetes.default.svc:443 \
        --deploy-mode client \
        --class ai.tripl.arc.ARC \
        --name arc \
        --conf spark.authenticate=true \
        --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.driver.host=$(hostname -I)  \
        --conf spark.driver.memory=921m \
        --conf spark.executor.cores={{inputs.parameters.executorCores}} \
        --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.executor.instances={{inputs.parameters.executorInstances}} \
        --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \
        --conf spark.io.encryption.enabled=true \
        --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \
        --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
        --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \
        --conf spark.kubernetes.container.image={{inputs.parameters.image}} \
        --conf spark.kubernetes.driver.limit.cores=1 \
        --conf spark.kubernetes.driver.pod.name=$(hostname) \
        --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \
        --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \
        --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.local.dirs.tmpfs=true \
        --conf spark.kubernetes.namespace={{workflow.namespace}} \
        --conf spark.network.crypto.enabled=true \
        --conf spark.sql.ansi.enabled=true \
        {{inputs.parameters.sparkConf}} \
        local:///opt/spark/jars/arc.jar \
        --etl.config.uri={{inputs.parameters.configUri}} \
        --etl.config.job.id={{inputs.parameters.jobId}} \
        --etl.config.environment={{inputs.parameters.environment}} \
        --etl.config.ignoreEnvironments=false \
        --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \
        --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \
        {{inputs.parameters.parameters}}

  - name: mediumJob
    retryStrategy:
      limit: 3
      retryPolicy: "Always"
    inputs:
      # override defaults here
      parameters:
      - name: jobId
      - name: configUri
      - name: image
        value: ghcr.io/tripl-ai/arc:latest
      - name: pullPolicy
        value: "Always"    
      - name: executorInstances
        value: "2"
      - name: executorCores
        value: "2"
      - name: executorMemory
        value: "10"
      - name: sparkConf
        value: ""
      - name: tags
        value: ""
      - name: parameters
        value: ""
      # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env.   
      - name: environment
        value: test  
    metadata:
      labels:
          app: spark
          workflowId: "{{workflow.uid}}"
    script:
      resources:
        limits:
          cpu: "2"
          memory: "13Gi"
      image: "{{inputs.parameters.image}}"
      command: ["/bin/sh"]
      source: |
        # verbose logging
        set -ex

        # print current hostname and ip
        hostname
        hostname -I

        # submit job
        /opt/spark/bin/spark-submit \
        --master k8s://kubernetes.default.svc:443 \
        --deploy-mode client \
        --class ai.tripl.arc.ARC \
        --name arc \
        --conf spark.authenticate=true \
        --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.driver.host=$(hostname -I)  \
        --conf spark.driver.memory=2g \
        --conf spark.executor.cores={{inputs.parameters.executorCores}} \
        --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.executor.instances={{inputs.parameters.executorInstances}} \
        --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \
        --conf spark.io.encryption.enabled=true \
        --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \
        --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
        --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \
        --conf spark.kubernetes.container.image={{inputs.parameters.image}} \
        --conf spark.kubernetes.driver.limit.cores=1 \
        --conf spark.kubernetes.driver.pod.name=$(hostname) \
        --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \
        --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \
        --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.local.dirs.tmpfs=true \
        --conf spark.kubernetes.namespace={{workflow.namespace}} \
        --conf spark.network.crypto.enabled=true \
        --conf spark.sql.ansi.enabled=true \
        {{inputs.parameters.sparkConf}} \
        local:///opt/spark/jars/arc.jar \
        --etl.config.uri={{inputs.parameters.configUri}} \
        --etl.config.job.id={{inputs.parameters.jobId}} \
        --etl.config.environment={{inputs.parameters.environment}} \
        --etl.config.ignoreEnvironments=false \
        --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \
        --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \
        {{inputs.parameters.parameters}}
  
  - name: largeJob
    retryStrategy:
      limit: 3
      retryPolicy: "Always"
    inputs:
      # override defaults here
      parameters:
      - name: jobId
      - name: configUri
      - name: image
        value: ghcr.io/tripl-ai/arc:latest
      - name: pullPolicy
        value: "Always"  
      - name: executorInstances
        value: "3"
      - name: executorCores
        value: "2"
      - name: executorMemory
        value: "12"
      - name: sparkConf
        value: ""
      - name: tags
        value: ""
      - name: parameters
        value: ""
      # to exec each stages at a jupyter notebook, we can controle it by matching the environment. Some stages may not required in prod env.   
      - name: environment
        value: test  
    metadata:
      labels:
          app: spark
          workflowId: "{{workflow.uid}}"
    script:
      resources:
        limits:
          cpu: "3"
          memory: "13Gi"
      image: "{{inputs.parameters.image}}"
      command: ["/bin/sh"]
      source: |
        # verbose logging
        set -ex

        # print current hostname and ip
        hostname
        hostname -I

        # submit job
        /opt/spark/bin/spark-submit \
        --master k8s://kubernetes.default.svc:443 \
        --deploy-mode client \
        --class ai.tripl.arc.ARC \
        --name arc \
        --conf spark.authenticate=true \
        --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.driver.host=$(hostname -I)  \
        --conf spark.driver.memory=4g \
        --conf spark.executor.cores={{inputs.parameters.executorCores}} \
        --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
        --conf spark.executor.instances={{inputs.parameters.executorInstances}} \
        --conf spark.executor.memory={{inputs.parameters.executorMemory}}G \
        --conf spark.io.encryption.enabled=true \
        --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName={{workflow.serviceAccountName}} \
        --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
        --conf spark.kubernetes.container.image.pullPolicy={{inputs.parameters.pullPolicy}} \
        --conf spark.kubernetes.container.image={{inputs.parameters.image}} \
        --conf spark.kubernetes.driver.limit.cores=1 \
        --conf spark.kubernetes.driver.pod.name=$(hostname) \
        --conf spark.kubernetes.executor.label.workflowId={{workflow.uid}} \
        --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.executor.podNamePrefix=$(hostname) \
        --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executorCores}} \
        --conf spark.kubernetes.local.dirs.tmpfs=true \
        --conf spark.kubernetes.namespace={{workflow.namespace}} \
        --conf spark.network.crypto.enabled=true \
        --conf spark.sql.ansi.enabled=true \
        {{inputs.parameters.sparkConf}} \
        local:///opt/spark/jars/arc.jar \
        --etl.config.uri={{inputs.parameters.configUri}} \
        --etl.config.job.id={{inputs.parameters.jobId}} \
        --etl.config.environment={{inputs.parameters.environment}} \
        --etl.config.ignoreEnvironments=false \
        --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \
        --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \
        {{inputs.parameters.parameters}}
          
  - name: sparkLocal
    inputs:
      retryStrategy:
        limit: 3
        retryPolicy: "Always"
      # override defaults here
      parameters:
      - name: jobId
      - name: configUri
      - name: image
        value: ghcr.io/tripl-ai/arc:latest
      - name: executorInstances
        value: "1"
      - name: executorCores
        value: "1"
      - name: executorMemory
        value: "1"
      - name: sparkConf
        value: ""
      - name: tags
        value: ""
      - name: parameters
        value: ""
      - name: pullPolicy
        value: IfNotPresent
      - name: environment
        value: test   
    metadata:
      labels:
        app: spark 
        workflowId: "{{workflow.uid}}"
    podSpecPatch: |
      containers:
        - name: main
          resources:
            requests:
              cpu: "{{inputs.parameters.executorCores}}"
              memory: "{{inputs.parameters.executorMemory}}Gi"
    script:
      image: "{{inputs.parameters.image}}"
      command: ["/bin/sh"]
      source: |
        # verbose logging
        set -ex

        # print current hostname and ip
        hostname
        hostname -I

        # submit job
        # driver memory is set at 90% of executorMemory
        /opt/spark/bin/spark-submit \
        --master local[{{inputs.parameters.executorCores}}] \
        --driver-memory $(({{inputs.parameters.executorMemory}} * 1024 * 90/100))m \
        --driver-java-options "-XX:+UseG1GC" \
        --class ai.tripl.arc.ARC \
        --name arc \
        --conf spark.driver.host=$(hostname -I)  \
        --conf spark.driver.pod.name=$(hostname)-driver \
        --conf spark.io.encryption.enabled=true \
        --conf spark.sql.adaptive.enabled=true \
        --conf spark.network.crypto.enabled=true \
        --conf spark.ui.enabled=true \
        --conf spark.sql.ansi.enabled=true \
        {{inputs.parameters.sparkConf}} \
        local:///opt/spark/jars/arc.jar \
        --etl.config.uri={{inputs.parameters.configUri}} \
        --etl.config.job.id={{inputs.parameters.jobId}} \
        --etl.config.environment={{inputs.parameters.environment}} \
        --etl.config.ignoreEnvironments=fales \
        --etl.config.tags="service=arc workflowId={{workflow.uid}} pod={{pod.name}} serviceAccount={{workflow.serviceAccountName}} namespace={{workflow.namespace}} {{inputs.parameters.tags}}" \
        --ETL_CONF_EPOCH=$(date '+%s') --ETL_CONF_CURRENT_TIMESTAMP="'$(date -u '+%Y-%m-%d %H:%M:%S')'" \
        {{inputs.parameters.parameters}}