/** * Copyright 2012-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.services.simpleworkflow.flow; import com.amazonaws.services.simpleworkflow.flow.core.Functor; import com.amazonaws.services.simpleworkflow.flow.core.Promise; import com.amazonaws.services.simpleworkflow.flow.core.Settable; import com.amazonaws.services.simpleworkflow.flow.core.Task; import com.amazonaws.services.simpleworkflow.flow.core.TryFinally; import com.amazonaws.services.simpleworkflow.flow.generic.GenericWorkflowClient; import com.amazonaws.services.simpleworkflow.flow.generic.SignalExternalWorkflowParameters; import com.amazonaws.services.simpleworkflow.flow.generic.StartChildWorkflowExecutionParameters; import com.amazonaws.services.simpleworkflow.flow.generic.StartChildWorkflowReply; import com.amazonaws.services.simpleworkflow.model.WorkflowExecution; import com.amazonaws.services.simpleworkflow.model.WorkflowType; public class DynamicWorkflowClientImpl implements DynamicWorkflowClient { protected WorkflowType workflowType; protected GenericWorkflowClient genericClient; protected StartWorkflowOptions schedulingOptions; protected DataConverter dataConverter; protected WorkflowExecution workflowExecution; protected String requestedWorkflowId; protected boolean startAttempted; protected Settable runId = new Settable(); protected DecisionContextProvider decisionContextProvider = new DecisionContextProviderImpl(); public DynamicWorkflowClientImpl() { this(null, null, null, null, null); } public DynamicWorkflowClientImpl(WorkflowExecution workflowExecution) { this(workflowExecution, null, null, null, null); } public DynamicWorkflowClientImpl(WorkflowExecution workflowExecution, WorkflowType workflowType) { this(workflowExecution, workflowType, null, null, null); } public DynamicWorkflowClientImpl(WorkflowExecution workflowExecution, WorkflowType workflowType, StartWorkflowOptions options) { this(workflowExecution, workflowType, options, null, null); } public DynamicWorkflowClientImpl(WorkflowExecution workflowExecution, WorkflowType workflowType, StartWorkflowOptions options, DataConverter dataConverter) { this(workflowExecution, workflowType, options, dataConverter, null); } public DynamicWorkflowClientImpl(WorkflowExecution workflowExecution, WorkflowType workflowType, StartWorkflowOptions options, DataConverter dataConverter, GenericWorkflowClient genericClient) { this.workflowType = workflowType; this.workflowExecution = workflowExecution; if (workflowExecution.getRunId() != null) { this.runId.set(workflowExecution.getRunId()); } if (dataConverter == null) { this.dataConverter = new JsonDataConverter(); } else { this.dataConverter = dataConverter; } this.schedulingOptions = options; this.genericClient = genericClient; } @Override public DataConverter getDataConverter() { return dataConverter; } @Override public StartWorkflowOptions getSchedulingOptions() { return schedulingOptions; } @Override public GenericWorkflowClient getGenericClient() { return genericClient; } @Override public Promise getRunId() { return runId; } @Override public WorkflowExecution getWorkflowExecution() { return workflowExecution; } @Override public WorkflowType getWorkflowType() { return workflowType; } public void setWorkflowType(WorkflowType workflowType) { this.workflowType = workflowType; } public void setGenericClient(GenericWorkflowClient genericClient) { this.genericClient = genericClient; } public void setSchedulingOptions(StartWorkflowOptions schedulingOptions) { this.schedulingOptions = schedulingOptions; } public void setDataConverter(DataConverter dataConverter) { this.dataConverter = dataConverter; } @Override public void requestCancelWorkflowExecution(Promise... waitFor) { checkWorkflowExecution(); new Task(waitFor) { @Override protected void doExecute() throws Throwable { GenericWorkflowClient client = getGenericClientToUse(); client.requestCancelWorkflowExecution(workflowExecution); } }; } private void checkWorkflowExecution() { if (workflowExecution == null) { throw new IllegalStateException("required property workflowExecution is null"); } } public Promise startWorkflowExecution(final Promise[] arguments, final StartWorkflowOptions startOptionsOverride, final Class returnType, final Promise... waitFor) { checkState(); if (runId.isReady()) { runId = new Settable(); workflowExecution.setRunId(null); } return new Functor(arguments) { @Override protected Promise doExecute() throws Throwable { Object[] input = new Object[arguments.length]; for (int i = 0; i < arguments.length; i++) { Promise argument = arguments[i]; input[i] = argument.get(); } return startWorkflowExecution(input, startOptionsOverride, returnType, waitFor); } }; } public Promise startWorkflowExecution(final Object[] arguments, final StartWorkflowOptions startOptionsOverride, final Class returnType, Promise... waitFor) { checkState(); final Settable result = new Settable(); if (runId.isReady()) { runId = new Settable(); workflowExecution.setRunId(null); } new TryFinally(waitFor) { Promise reply; @Override protected void doTry() throws Throwable { StartChildWorkflowExecutionParameters parameters = new StartChildWorkflowExecutionParameters(); parameters.setWorkflowType(workflowType); final String convertedArguments = dataConverter.toData(arguments); parameters.setInput(convertedArguments); if (!startAttempted) { parameters.setWorkflowId(workflowExecution.getWorkflowId()); requestedWorkflowId = workflowExecution.getWorkflowId(); startAttempted = true; } else { // Subsequent attempts (e.g. on retry) use the same workflow id as the initial attempt parameters.setWorkflowId(requestedWorkflowId); workflowExecution.setWorkflowId(requestedWorkflowId); } final StartChildWorkflowExecutionParameters startParameters = parameters.createStartChildWorkflowExecutionParametersFromOptions( schedulingOptions, startOptionsOverride); GenericWorkflowClient client = getGenericClientToUse(); reply = client.startChildWorkflow(startParameters); runId.setDescription("runId of " + reply.getDescription()); result.setDescription(reply.getDescription()); new Task(reply) { @Override protected void doExecute() throws Throwable { StartChildWorkflowReply r = reply.get(); if (!runId.isReady()) { runId.set(r.getRunId()); workflowExecution.setRunId(r.getRunId()); workflowExecution.setWorkflowId(r.getWorkflowId()); } } }; } @Override protected void doCatch(Throwable e) throws Throwable { if (e instanceof ChildWorkflowFailedException) { ChildWorkflowFailedException taskFailedException = (ChildWorkflowFailedException) e; try { String details = taskFailedException.getDetails(); if (details != null) { Throwable cause = dataConverter.fromData(details, Throwable.class); if (cause != null && taskFailedException.getCause() == null) { taskFailedException.initCause(cause); } } } catch (DataConverterException dataConverterException) { if (dataConverterException.getCause() == null) { dataConverterException.initCause(taskFailedException); } throw dataConverterException; } } throw e; } @Override protected void doFinally() throws Throwable { if (reply != null && reply.isReady() && reply.get().getResult().isReady()) { if (returnType.equals(Void.class)) { result.set(null); } else { T output = dataConverter.fromData(reply.get().getResult().get(), returnType); result.set(output); } } } }; return result; } @Override public void signalWorkflowExecution(final String signalName, final Object[] arguments, Promise... waitFor) { checkWorkflowExecution(); new Task(waitFor) { @Override protected void doExecute() throws Throwable { SignalExternalWorkflowParameters parameters = new SignalExternalWorkflowParameters(); parameters.setSignalName(signalName); String input = dataConverter.toData(arguments); parameters.setInput(input); parameters.setWorkflowId(workflowExecution.getWorkflowId()); parameters.setRunId(workflowExecution.getRunId()); GenericWorkflowClient client = getGenericClientToUse(); client.signalWorkflowExecution(parameters); } }; } private void checkState() { if (workflowType == null) { throw new IllegalStateException("required property workflowType is null"); } checkWorkflowExecution(); } private GenericWorkflowClient getGenericClientToUse() { GenericWorkflowClient client; if (genericClient == null) { client = decisionContextProvider.getDecisionContext().getWorkflowClient(); } else { client = genericClient; } return client; } }