diff --git a/backend/src/main/scala/cromwell/backend/backend.scala b/backend/src/main/scala/cromwell/backend/backend.scala index 601207362..30317d068 100644 --- a/backend/src/main/scala/cromwell/backend/backend.scala +++ b/backend/src/main/scala/cromwell/backend/backend.scala @@ -141,6 +141,8 @@ object CommonBackendConfigurationAttributes { "default-runtime-attributes.noAddress", "default-runtime-attributes.docker", "default-runtime-attributes.queueArn", + "default-runtime-attributes.awsBatchRetryAttempts", + "default-runtime-attributes.ulimits", "default-runtime-attributes.failOnStderr", "slow-job-warning-time", "dockerhub", diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md index 7675cc767..1bce8a7b3 100644 --- a/docs/RuntimeAttributes.md +++ b/docs/RuntimeAttributes.md @@ -56,6 +56,9 @@ There are a number of additional runtime attributes that apply to the Google Clo - [useDockerImageCache](#usedockerimagecache) +### AWS Specific Attributes +- [awsBatchRetryAttempts](#awsBatchRetryAttempts) +- [ulimits](#ulimits) ## Expression support @@ -323,8 +326,6 @@ runtime { ``` - - ### `bootDiskSizeGb` In addition to working disks, Google Cloud allows specification of a boot disk size. This is the disk where the docker image itself is booted (**not the working directory of your task on the VM**). @@ -373,6 +374,54 @@ runtime { } ``` + +### `awsBatchRetryAttempts` + +*Default: _0_* + +This runtime attribute adds support to [*AWS Batch Automated Job Retries*](https://docs.aws.amazon.com/batch/latest/userguide/job_retries.html) which makes it possible to tackle transient job failures. For example, if a task fails due to a timeout from accessing an external service, then this option helps re-run the failed the task without having to re-run the entire workflow. It takes an Int, between 1 and 10, as a value that indicates the maximum number of times AWS Batch should retry a failed task. If the value 0 is passed, the [*Retry Strategy*](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#retryStrategy) will not be added to the job definiton and the task will run just once. + +``` +runtime { + awsBatchRetryAttempts: integer +} +``` + + +### `ulimits` + +*Default: _empty_* + +A list of [`ulimits`](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties) values to set in the container. This parameter maps to `Ulimits` in the [Create a container](https://docs.docker.com/engine/api/v1.38/) section of the [Docker Remote API](https://docs.docker.com/engine/api/v1.38/) and the `--ulimit` option to [docker run](https://docs.docker.com/engine/reference/commandline/run/). + +``` +"ulimits": [ + { + "name": string, + "softLimit": integer, + "hardLimit": integer + } + ... +] +``` +Parameter description: + +- `name` + - The `type` of the `ulimit`. + - Type: String + - Required: Yes, when `ulimits` is used. + +- `softLimit` + - The soft limit for the `ulimit` type. + - Type: Integer + - Required: Yes, when `ulimits` is used. + +- `hardLimit` + - The hard limit for the `ulimit` type. + - Type: Integer + - Required: Yes, when `ulimits` is used. + + #### How to Setup Configure your Google network to use "Private Google Access". This will allow your VMs to access Google Services including Google Container Registry, as well as Dockerhub images. diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java b/filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java index 987e8d71c..9d5255876 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java @@ -1,20 +1,16 @@ package org.lerch.s3fs; - + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsCredentials; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.*; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.http.SdkHttpClient; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import java.net.URI; import java.util.Properties; diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3AccessControlList.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3AccessControlList.java index 0d908001e..1f2191647 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3AccessControlList.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3AccessControlList.java @@ -1,14 +1,14 @@ package org.lerch.s3fs; -import static java.lang.String.format; +import software.amazon.awssdk.services.s3.model.Grant; +import software.amazon.awssdk.services.s3.model.Owner; +import software.amazon.awssdk.services.s3.model.Permission; import java.nio.file.AccessDeniedException; import java.nio.file.AccessMode; import java.util.EnumSet; -import software.amazon.awssdk.services.s3.model.Grant; -import software.amazon.awssdk.services.s3.model.Owner; -import software.amazon.awssdk.services.s3.model.Permission; +import static java.lang.String.format; public class S3AccessControlList { private String fileStoreName; diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileChannel.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileChannel.java index 2f8bd41ff..e84929479 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileChannel.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileChannel.java @@ -1,8 +1,15 @@ package org.lerch.s3fs; import org.apache.tika.Tika; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -14,13 +21,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Object; - import static java.lang.String.format; public class S3FileChannel extends FileChannel implements S3Channel { @@ -46,7 +46,7 @@ public class S3FileChannel extends FileChannel implements S3Channel { boolean removeTempFile = true; try { if (exists) { - try (ResponseInputStream byteStream = path.getFileSystem() + try (ResponseInputStream byteStream = path.getFileStore() .getClient() .getObject(GetObjectRequest .builder() @@ -171,7 +171,7 @@ public class S3FileChannel extends FileChannel implements S3Channel { .contentLength(length) .contentType(new Tika().detect(stream, path.getFileName().toString())); - path.getFileSystem().getClient().putObject(builder.build(), RequestBody.fromInputStream(stream, length)); + path.getFileStore().getClient().putObject(builder.build(), RequestBody.fromInputStream(stream, length)); } } } diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileStore.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileStore.java index d8082d060..8e1599918 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileStore.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileStore.java @@ -1,28 +1,37 @@ package org.lerch.s3fs; +import org.lerch.s3fs.util.S3ClientStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.*; + import java.io.IOException; +import java.net.URI; import java.nio.file.FileStore; import java.nio.file.attribute.FileAttributeView; import java.nio.file.attribute.FileStoreAttributeView; import java.util.Date; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.Bucket; -import software.amazon.awssdk.services.s3.model.GetBucketAclRequest; -import software.amazon.awssdk.services.s3.model.HeadBucketRequest; -import software.amazon.awssdk.services.s3.model.ListBucketsRequest; -import software.amazon.awssdk.services.s3.model.NoSuchBucketException; -import software.amazon.awssdk.services.s3.model.Owner; -import com.google.common.collect.ImmutableList; - +/** + * In S3 a filestore translates to a bucket + */ public class S3FileStore extends FileStore implements Comparable { private S3FileSystem fileSystem; private String name; + private S3Client defaultClient; + private final Logger logger = LoggerFactory.getLogger("S3FileStore"); public S3FileStore(S3FileSystem s3FileSystem, String name) { this.fileSystem = s3FileSystem; this.name = name; + // the default client can be used for getBucketLocation operations + this.defaultClient = S3Client.builder().endpointOverride(URI.create("https://s3.us-east-1.amazonaws.com")).region(Region.US_EAST_1).build(); } @Override @@ -111,8 +120,8 @@ public class S3FileStore extends FileStore implements Comparable { // model as HeadBucket is now required boolean bucket = false; try { - getClient().headBucket(HeadBucketRequest.builder().bucket(bucketName).build()); - bucket = true; + getClient().headBucket(HeadBucketRequest.builder().bucket(bucketName).build()); + bucket = true; }catch(NoSuchBucketException nsbe) {} return bucket; } @@ -121,8 +130,13 @@ public class S3FileStore extends FileStore implements Comparable { return new S3Path(fileSystem, "/" + this.name()); } - private S3Client getClient() { - return fileSystem.getClient(); + /** + * Gets a client suitable for this FileStore (bucket) including configuring the correct region endpoint. If no client + * exists one will be constructed and cached. + * @return a client + */ + public S3Client getClient() { + return S3ClientStore.getInstance().getClientForBucketName(this.name); } public Owner getOwner() { diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystem.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystem.java index 4fadae951..0038a787c 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystem.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystem.java @@ -1,20 +1,16 @@ package org.lerch.s3fs; -import static org.lerch.s3fs.S3Path.PATH_SEPARATOR; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Bucket; import java.io.IOException; -import java.nio.file.FileStore; -import java.nio.file.FileSystem; -import java.nio.file.Path; -import java.nio.file.PathMatcher; -import java.nio.file.WatchService; +import java.nio.file.*; import java.nio.file.attribute.UserPrincipalLookupService; import java.util.Set; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.Bucket; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import static org.lerch.s3fs.S3Path.PATH_SEPARATOR; /** * S3FileSystem with a concrete client configured and ready to use. @@ -63,7 +59,7 @@ public class S3FileSystem extends FileSystem implements Comparable @Override public String getSeparator() { - return S3Path.PATH_SEPARATOR; + return PATH_SEPARATOR; } @Override @@ -113,9 +109,15 @@ public class S3FileSystem extends FileSystem implements Comparable throw new UnsupportedOperationException(); } - public S3Client getClient() { - return client; - } +// /** +// * Deprecated: since SDKv2 many S3 operations need to be signed with a client using the same Region as the location +// * of the bucket. Prefer S3Path.client() instead. +// * @return +// */ +// @Deprecated +// public S3Client getClient() { +// return client; +// } /** * get the endpoint associated with this fileSystem. diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystemProvider.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystemProvider.java index 0095e372a..e3fdf0065 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystemProvider.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3FileSystemProvider.java @@ -1,11 +1,11 @@ package org.lerch.s3fs; -import org.apache.commons.lang3.tuple.ImmutablePair; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.lerch.s3fs.attribute.S3BasicFileAttributeView; import org.lerch.s3fs.attribute.S3BasicFileAttributes; import org.lerch.s3fs.attribute.S3PosixFileAttributeView; @@ -36,7 +36,8 @@ import java.util.stream.IntStream; import static com.google.common.collect.Sets.difference; import static java.lang.String.format; -import static java.lang.Thread.*; +import static java.lang.Thread.currentThread; +import static java.lang.Thread.sleep; import static org.lerch.s3fs.AmazonS3Factory.*; /** @@ -286,7 +287,10 @@ public class S3FileSystemProvider extends FileSystemProvider { if (fileSystems.containsKey(key)) { return fileSystems.get(key); } else { - throw new FileSystemNotFoundException("S3 filesystem not yet created. Use newFileSystem() instead"); + final String scheme = uri.getScheme(); + final String uriString = uri.toString(); + uriString.replace(scheme, "https://"); + return (S3FileSystem) newFileSystem(uri, Collections.emptyMap()); } } @@ -337,7 +341,7 @@ public class S3FileSystemProvider extends FileSystemProvider { try { ResponseInputStream res = s3Path - .getFileSystem() + .getFileStore() .getClient() .getObject(GetObjectRequest .builder() @@ -384,7 +388,7 @@ public class S3FileSystemProvider extends FileSystemProvider { Bucket bucket = s3Path.getFileStore().getBucket(); String bucketName = s3Path.getFileStore().name(); if (bucket == null) { - s3Path.getFileSystem().getClient().createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); + s3Path.getFileStore().getClient().createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); } // create the object as directory PutObjectRequest.Builder builder = PutObjectRequest.builder(); @@ -392,7 +396,7 @@ public class S3FileSystemProvider extends FileSystemProvider { builder.bucket(bucketName) .key(directoryKey) .contentLength(0L); - s3Path.getFileSystem().getClient().putObject(builder.build(), RequestBody.fromBytes(new byte[0])); + s3Path.getFileStore().getClient().putObject(builder.build(), RequestBody.fromBytes(new byte[0])); } @Override @@ -405,9 +409,9 @@ public class S3FileSystemProvider extends FileSystemProvider { String key = s3Path.getKey(); String bucketName = s3Path.getFileStore().name(); - s3Path.getFileSystem().getClient().deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(key).build()); + s3Path.getFileStore().getClient().deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(key).build()); // we delete the two objects (sometimes exists the key '/' and sometimes not) - s3Path.getFileSystem().getClient().deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(key + "/").build()); + s3Path.getFileStore().getClient().deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(key + "/").build()); } @Override @@ -437,7 +441,8 @@ public class S3FileSystemProvider extends FileSystemProvider { String keySource = s3Source.getKey(); String bucketNameTarget = s3Target.getFileStore().name(); String keyTarget = s3Target.getKey(); - s3Source.getFileSystem() + // for a cross region copy the client must be for the target (region) not the source region + s3Target.getFileStore() .getClient() .copyObject(CopyObjectRequest.builder() .copySource(bucketNameOrigin + "/" + keySource) @@ -458,7 +463,7 @@ public class S3FileSystemProvider extends FileSystemProvider { private void multiPartCopy(S3Path source, long objectSize, S3Path target, CopyOption... options) { log.info(() -> "Attempting multipart copy as part of call cache hit: source = " + source + ", objectSize = " + objectSize + ", target = " + target + ", options = " + Arrays.deepToString(options)); - S3Client s3Client = target.getFileSystem().getClient(); + S3Client s3Client = target.getFileStore().getClient(); final CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(target.getFileStore().name()) @@ -594,7 +599,7 @@ public class S3FileSystemProvider extends FileSystemProvider { */ private long objectSize(S3Path object) { - S3Client s3Client = object.getFileSystem().getClient(); + S3Client s3Client = object.getFileStore().getClient(); final String bucket = object.getFileStore().name(); final String key = object.getKey(); final HeadObjectResponse headObjectResponse = s3Client.headObject(HeadObjectRequest.builder() @@ -656,7 +661,7 @@ public class S3FileSystemProvider extends FileSystemProvider { String key = s3Utils.getS3ObjectSummary(s3Path).key(); String bucket = s3Path.getFileStore().name(); S3AccessControlList accessControlList = - new S3AccessControlList(bucket, key, s3Path.getFileSystem().getClient().getObjectAcl(GetObjectAclRequest.builder().bucket(bucket).key(key).build()).grants(), s3Path.getFileStore().getOwner()); + new S3AccessControlList(bucket, key, s3Path.getFileStore().getClient().getObjectAcl(GetObjectAclRequest.builder().bucket(bucket).key(key).build()).grants(), s3Path.getFileStore().getOwner()); accessControlList.checkAccess(modes); } diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3Iterator.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3Iterator.java index 3d8f5cb80..803b52758 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3Iterator.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3Iterator.java @@ -1,21 +1,15 @@ package org.lerch.s3fs; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.lerch.s3fs.util.S3Utils; import software.amazon.awssdk.services.s3.model.CommonPrefix; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.S3Object; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.lerch.s3fs.util.S3Utils; + +import java.nio.file.Path; +import java.util.*; /** * S3 iterator over folders at first level. @@ -50,7 +44,7 @@ public class S3Iterator implements Iterator { this.fileStore = fileStore; this.fileSystem = fileStore.getFileSystem(); this.key = key; - this.current = fileSystem.getClient().listObjects(listObjectsRequest); + this.current = fileStore.getClient().listObjects(listObjectsRequest); this.incremental = incremental; loadObjects(); } @@ -69,7 +63,7 @@ public class S3Iterator implements Iterator { .marker(current.nextMarker()) .build(); - this.current = fileSystem.getClient().listObjects(request); + this.current = fileStore.getClient().listObjects(request); loadObjects(); } if (cursor == size) diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3Path.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3Path.java index 271a1b31e..1382c16c3 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3Path.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3Path.java @@ -4,19 +4,19 @@ import com.google.common.base.*; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.lerch.s3fs.attribute.S3BasicFileAttributes; +import software.amazon.awssdk.services.s3.S3Client; import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URL; import java.net.URLDecoder; import java.nio.file.*; import java.util.Iterator; import java.util.List; -import java.util.Map; import static com.google.common.collect.Iterables.*; +import static com.google.common.collect.Iterables.concat; import static java.lang.String.format; public class S3Path implements Path { diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/S3SeekableByteChannel.java b/filesystems/s3/src/main/java/org/lerch/s3fs/S3SeekableByteChannel.java index 31318d85c..5f0ff5715 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/S3SeekableByteChannel.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/S3SeekableByteChannel.java @@ -1,8 +1,11 @@ package org.lerch.s3fs; -import static java.lang.String.format; +import org.apache.tika.Tika; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import java.io.ByteArrayInputStream; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -13,21 +16,14 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; -import org.apache.tika.Tika; - -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Object; +import static java.lang.String.format; public class S3SeekableByteChannel implements SeekableByteChannel, S3Channel { - private S3Path path; - private Set options; - private SeekableByteChannel seekable; - private Path tempFile; + private final S3Path path; + private final Set options; + private final SeekableByteChannel seekable; + private final Path tempFile; /** * Open or creates a file, returning a seekable byte channel @@ -52,7 +48,7 @@ public class S3SeekableByteChannel implements SeekableByteChannel, S3Channel { boolean removeTempFile = true; try { if (exists) { - try (InputStream byteStream = path.getFileSystem().getClient() + try (InputStream byteStream = path.getFileStore().getClient() .getObject(GetObjectRequest.builder().bucket(path.getFileStore().getBucket().name()).key(key).build())) { Files.copy(byteStream, tempFile, StandardCopyOption.REPLACE_EXISTING); } @@ -115,7 +111,7 @@ public class S3SeekableByteChannel implements SeekableByteChannel, S3Channel { builder.bucket(path.getFileStore().name()); builder.key(path.getKey()); - S3Client client = path.getFileSystem().getClient(); + S3Client client = path.getFileStore().getClient(); client.putObject(builder.build(), RequestBody.fromInputStream(stream, length)); } diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3BasicFileAttributes.java b/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3BasicFileAttributes.java index 40e02b17a..895f5109f 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3BasicFileAttributes.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3BasicFileAttributes.java @@ -1,10 +1,10 @@ package org.lerch.s3fs.attribute; -import static java.lang.String.format; - import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; +import static java.lang.String.format; + public class S3BasicFileAttributes implements BasicFileAttributes { private final FileTime lastModifiedTime; private final long size; diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3PosixFileAttributes.java b/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3PosixFileAttributes.java index fbe5efac5..6ffdee62f 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3PosixFileAttributes.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/attribute/S3PosixFileAttributes.java @@ -3,8 +3,6 @@ package org.lerch.s3fs.attribute; import java.nio.file.attribute.*; import java.util.Set; -import static java.lang.String.format; - public class S3PosixFileAttributes extends S3BasicFileAttributes implements PosixFileAttributes { private UserPrincipal userPrincipal; diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3ClientStore.java b/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3ClientStore.java new file mode 100644 index 000000000..78a139d3f --- /dev/null +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3ClientStore.java @@ -0,0 +1,86 @@ +package org.lerch.s3fs.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadBucketResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A Singleton cache of clients for buckets configured for the region of those buckets + */ +public class S3ClientStore { + + private static final S3ClientStore instance = new S3ClientStore(); + + public static final S3Client DEFAULT_CLIENT = S3Client.builder().endpointOverride(URI.create("https://s3.us-east-1.amazonaws.com")).region(Region.US_EAST_1).build(); + + private final Map bucketToClientMap = Collections.synchronizedMap(new HashMap<>()); + + Logger logger = LoggerFactory.getLogger("S3ClientStore"); + + + private S3ClientStore(){} + + public static S3ClientStore getInstance(){ + return instance; + } + + public S3Client getClientForBucketName( String bucketName ) { + logger.debug("obtaining client for bucket '{}'", bucketName); + if (bucketName == null || bucketName.trim().equals("")) { + return DEFAULT_CLIENT; + } + + return bucketToClientMap.computeIfAbsent(bucketName, this::generateClient); + } + + private S3Client generateClient (String name) { + logger.info("generating client for bucket: '{}'", name); + S3Client bucketSpecificClient; + try { + logger.info("determining bucket location with getBucketLocation"); + String bucketLocation = DEFAULT_CLIENT.getBucketLocation(builder -> builder.bucket(name)).locationConstraintAsString(); + + bucketSpecificClient = this.clientForRegion(bucketLocation); + + } catch (S3Exception e) { + if(e.statusCode() == 403) { + logger.info("Cannot determine location of '{}' bucket directly. Attempting to obtain bucket location with headBucket operation", name); + try { + final HeadBucketResponse headBucketResponse = DEFAULT_CLIENT.headBucket(builder -> builder.bucket(name)); + bucketSpecificClient = this.clientForRegion(headBucketResponse.sdkHttpResponse().firstMatchingHeader("x-amz-bucket-region").orElseThrow()); + } catch (S3Exception e2) { + if (e2.statusCode() == 301) { + bucketSpecificClient = this.clientForRegion(e2.awsErrorDetails().sdkHttpResponse().firstMatchingHeader("x-amz-bucket-region").orElseThrow()); + } else { + throw e2; + } + } + } else { + throw e; + } + } + + if (bucketSpecificClient == null) { + logger.warn("Unable to determine the region of bucket: '{}'. Generating a client for the current region.", name); + bucketSpecificClient = S3Client.create(); + } + + return bucketSpecificClient; + } + + private S3Client clientForRegion(String regionString){ + // It may be useful to further cache clients for regions although at some point clients for buckets may need to be + // specialized beyond just region end points. + Region region = regionString.equals("") ? Region.US_EAST_1 : Region.of(regionString); + logger.info("bucket region is: '{}'", region.id()); + return S3Client.builder().region(region).build(); + } +} diff --git a/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3Utils.java b/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3Utils.java index e8586ef4d..4f4a3d249 100644 --- a/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3Utils.java +++ b/filesystems/s3/src/main/java/org/lerch/s3fs/util/S3Utils.java @@ -1,22 +1,14 @@ package org.lerch.s3fs.util; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.s3.model.GetObjectAclRequest; -import software.amazon.awssdk.services.s3.model.GetObjectAclResponse; -import software.amazon.awssdk.services.s3.model.Grant; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.Owner; -import software.amazon.awssdk.services.s3.model.Permission; -import software.amazon.awssdk.services.s3.model.S3Object; -import software.amazon.awssdk.services.s3.model.S3Exception; import com.google.common.collect.Sets; -import org.lerch.s3fs.attribute.S3BasicFileAttributes; import org.lerch.s3fs.S3Path; +import org.lerch.s3fs.attribute.S3BasicFileAttributes; import org.lerch.s3fs.attribute.S3PosixFileAttributes; import org.lerch.s3fs.attribute.S3UserPrincipal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; import java.nio.file.NoSuchFileException; import java.nio.file.attribute.FileTime; @@ -24,12 +16,12 @@ import java.nio.file.attribute.PosixFilePermission; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; /** * Utilities to work with Amazon S3 Objects. */ public class S3Utils { + Logger log = LoggerFactory.getLogger("S3Utils"); /** * Get the {@link S3Object} that represent this Path or her first child if this path not exists @@ -41,18 +33,24 @@ public class S3Utils { public S3Object getS3ObjectSummary(S3Path s3Path) throws NoSuchFileException { String key = s3Path.getKey(); String bucketName = s3Path.getFileStore().name(); - S3Client client = s3Path.getFileSystem().getClient(); + S3Client client = s3Path.getFileStore().getClient(); // try to find the element with the current key (maybe with end slash or maybe not.) try { HeadObjectResponse metadata = client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(key).build()); - GetObjectAclResponse acl = client.getObjectAcl(GetObjectAclRequest.builder().bucket(bucketName).key(key).build()); + Owner objectOwner = Owner.builder().build(); + try { + GetObjectAclResponse acl = client.getObjectAcl(GetObjectAclRequest.builder().bucket(bucketName).key(key).build()); + objectOwner = acl.owner(); + } catch (S3Exception e2){ + log.warn("Unable to determine the owner of object: '{}', setting owner as empty", s3Path); + } S3Object.Builder builder = S3Object.builder(); builder .key(key) .lastModified(metadata.lastModified()) .eTag(metadata.eTag()) - .owner(acl.owner()) + .owner(objectOwner) .size(metadata.contentLength()) .storageClass(metadata.storageClassAsString()); @@ -63,9 +61,9 @@ public class S3Utils { } // if not found (404 err) with the original key. - // try to find the elment as a directory. + // try to find the element as a directory. try { - // is a virtual directory + // is a virtual directory (S3 prefix) ListObjectsV2Request.Builder request = ListObjectsV2Request.builder(); request.bucket(bucketName); String keyFolder = key; @@ -111,7 +109,7 @@ public class S3Utils { Set permissions = null; if (!attrs.isDirectory()) { - S3Client client = s3Path.getFileSystem().getClient(); + S3Client client = s3Path.getFileStore().getClient(); GetObjectAclResponse acl = client.getObjectAcl(GetObjectAclRequest.builder().bucket(bucketName).key(key).build()); Owner owner = acl.owner(); diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala index 8058d8979..645600d16 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala @@ -30,9 +30,11 @@ */ package cromwell.filesystems.s3.batch -import cromwell.core.io.{IoCommandBuilder, PartialIoCommandBuilder} +import cromwell.core.io.{IoCommandBuilder, IoContentAsStringCommand, IoIsDirectoryCommand, IoReadLinesCommand, IoWriteCommand, PartialIoCommandBuilder} +import cromwell.core.path.BetterFileMethods.OpenOptions import cromwell.core.path.Path import cromwell.filesystems.s3.S3Path +import org.slf4j.{Logger, LoggerFactory} import scala.util.Try @@ -40,6 +42,29 @@ import scala.util.Try * Generates commands for IO operations on S3 */ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder { + val Log: Logger = LoggerFactory.getLogger(PartialS3BatchCommandBuilder.getClass) + + + override def contentAsStringCommand: PartialFunction[(Path, Option[Int], Boolean), Try[IoContentAsStringCommand]] = { + Log.debug("call to contentAsStringCommand but PartialFunction not implemented, falling back to super") + super.contentAsStringCommand + } + + override def writeCommand: PartialFunction[(Path, String, OpenOptions, Boolean), Try[IoWriteCommand]] = { + Log.debug("call to writeCommand but PartialFunction not implemented, falling back to super") + super.writeCommand + } + + override def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = { + Log.debug("call to isDirectoryCommand but PartialFunction not implemented, falling back to super") + super.isDirectoryCommand + } + + override def readLinesCommand: PartialFunction[Path, Try[IoReadLinesCommand]] = { + Log.debug("call to readLinesCommand but PartialFunction not implemented, falling back to super") + super.readLinesCommand + } + override def sizeCommand: PartialFunction[Path, Try[S3BatchSizeCommand]] = { case path: S3Path => Try(S3BatchSizeCommand(path)) } diff --git a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala index a3c4f5fa5..954f5f355 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala @@ -56,7 +56,7 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser private val metadataReadTimeout: Duration = serviceConfig.getOrElse[Duration]("metadata-read-query-timeout", Duration.Inf) private val metadataReadRowNumberSafetyThreshold: Int = - serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 1000000) + serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 3000000) def readMetadataWorkerActorProps(): Props = ReadDatabaseMetadataWorkerActor diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 7a36946e7..4ceb0c829 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -202,10 +202,18 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar private def inputsFromWomFiles(namePrefix: String, remotePathArray: Seq[WomFile], localPathArray: Seq[WomFile], - jobDescriptor: BackendJobDescriptor): Iterable[AwsBatchInput] = { + jobDescriptor: BackendJobDescriptor, + flag: Boolean): Iterable[AwsBatchInput] = { + (remotePathArray zip localPathArray zipWithIndex) flatMap { case ((remotePath, localPath), index) => - Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPath.valueString), workingDisk)) + var localPathString = localPath.valueString + if (localPathString.startsWith("s3://")){ + localPathString = localPathString.replace("s3://", "") + }else if (localPathString.startsWith("s3:/")) { + localPathString = localPathString.replace("s3:/", "") + } + Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk)) } } @@ -237,7 +245,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar val writeFunctionFiles = instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f) } toMap val writeFunctionInputs = writeFunctionFiles flatMap { - case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor) + case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor, false) } // Collect all WomFiles from inputs to the call. @@ -257,7 +265,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val callInputInputs = callInputFiles flatMap { - case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor) + case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true) } val scriptInput: AwsBatchInput = AwsBatchFileInput( diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala index 26f69c4e7..dffcb9832 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala @@ -71,7 +71,14 @@ object AwsBatchAttributes { "filesystems.local.auth", "filesystems.s3.auth", "filesystems.s3.caching.duplication-strategy", - "filesystems.local.caching.duplication-strategy" + "filesystems.local.caching.duplication-strategy", + "auth", + "numCreateDefinitionAttempts", + "filesystems.s3.duplication-strategy", + "numSubmitAttempts", + "default-runtime-attributes.scriptBucketName", + "awsBatchRetryAttempts", + "ulimits" ) private val deprecatedAwsBatchKeys: Map[String, String] = Map( diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 2378c48c8..5068c4a97 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -30,8 +30,6 @@ */ package cromwell.backend.impl.aws -import java.security.MessageDigest - import cats.data.ReaderT._ import cats.data.{Kleisli, ReaderT} import cats.effect.{Async, Timer} @@ -55,10 +53,11 @@ import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.{GetObjectRequest, HeadObjectRequest, NoSuchKeyException, PutObjectRequest} import wdl4s.parser.MemoryUnit +import java.security.MessageDigest import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.higherKinds -import scala.util.{Random, Try} +import scala.util.Try /** * The actual job for submission in AWS batch. `AwsBatchJob` is the primary interface to AWS Batch. It creates the @@ -85,18 +84,12 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL optAwsAuthMode: Option[AwsAuthMode] = None ) { - // values for container environment - val AWS_MAX_ATTEMPTS: String = "AWS_MAX_ATTEMPTS" - val AWS_MAX_ATTEMPTS_DEFAULT_VALUE: String = "14" - val AWS_RETRY_MODE: String = "AWS_RETRY_MODE" - val AWS_RETRY_MODE_DEFAULT_VALUE: String = "adaptive" val Log: Logger = LoggerFactory.getLogger(AwsBatchJob.getClass) //this will be the "folder" that scripts will live in (underneath the script bucket) val scriptKeyPrefix = "scripts/" - // TODO: Auth, endpoint lazy val batchClient: BatchClient = { val builder = BatchClient.builder() configureClient(builder, optAwsAuthMode, configRegion) @@ -119,26 +112,25 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL */ lazy val reconfiguredScript: String = { //this is the location of the aws cli mounted into the container by the ec2 launch template - val s3Cmd = "/usr/local/aws-cli/v2/current/bin/aws s3" + val awsCmd = "/usr/local/aws-cli/v2/current/bin/aws " //internal to the container, therefore not mounted val workDir = "/tmp/scratch" //working in a mount will cause collisions in long running workers val replaced = commandScript.replaceAllLiterally(AwsBatchWorkingDisk.MountPoint.pathAsString, workDir) val insertionPoint = replaced.indexOf("\n", replaced.indexOf("#!")) +1 //just after the new line after the shebang! - /* generate a series of s3 copy statements to copy any s3 files into the container. We randomize the order - so that large scatters don't all attempt to copy the same thing at the same time. */ - val inputCopyCommand = Random.shuffle(inputs.map { + /* generate a series of s3 copy statements to copy any s3 files into the container. */ + val inputCopyCommand = inputs.map { case input: AwsBatchFileInput if input.s3key.startsWith("s3://") && input.s3key.endsWith(".tmp") => //we are localizing a tmp file which may contain workdirectory paths that need to be reconfigured s""" - |$s3Cmd cp --no-progress ${input.s3key} $workDir/${input.local} + |_s3_localize_with_retry ${input.s3key} $workDir/${input.local} |sed -i 's#${AwsBatchWorkingDisk.MountPoint.pathAsString}#$workDir#g' $workDir/${input.local} |""".stripMargin case input: AwsBatchFileInput if input.s3key.startsWith("s3://") => - s"$s3Cmd cp --no-progress ${input.s3key} ${input.mount.mountPoint.pathAsString}/${input.local}" + s"_s3_localize_with_retry ${input.s3key} ${input.mount.mountPoint.pathAsString}/${input.local}" .replaceAllLiterally(AwsBatchWorkingDisk.MountPoint.pathAsString, workDir) case input: AwsBatchFileInput => @@ -149,11 +141,41 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL s"test -e $filePath || echo 'input file: $filePath does not exist' && exit 1" case _ => "" - }.toList).mkString("\n") + }.toList.mkString("\n") // this goes at the start of the script after the #! val preamble = s""" + |export AWS_METADATA_SERVICE_TIMEOUT=10 + |export AWS_METADATA_SERVICE_NUM_ATTEMPTS=10 + | + |function _s3_localize_with_retry() { + | local s3_path=$$1 + | # destination must be the path to a file and not just the directory you want the file in + | local destination=$$2 + | + | for i in {1..5}; + | do + | if [[ $$s3_path =~ s3://([^/]+)/(.+) ]]; then + | bucket="$${BASH_REMATCH[1]}" + | key="$${BASH_REMATCH[2]}" + | content_length=$$($awsCmd s3api head-object --bucket "$$bucket" --key "$$key" --query 'ContentLength') + | else + | echo "$$s3_path is not an S3 path with a bucket and key. aborting" + | exit 1 + | fi + | $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" && + | [[ $$(LC_ALL=C ls -dn -- "$$destination" | awk '{print $$5; exit}') -eq "$$content_length" ]] && break || + | echo "attempt $$i to copy $$s3_path failed"; + | + | if [ "$$i" -eq 5 ]; then + | echo "failed to copy $$s3_path after $$i attempts. aborting" + | exit 2 + | fi + | sleep $$((7 * "$$i")) + | done + |} + | |{ |set -e |echo '*** LOCALIZING INPUTS ***' @@ -183,24 +205,24 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL */ s""" |touch ${output.name} - |$s3Cmd cp --no-progress ${output.name} ${output.s3key} - |if [ -e $globDirectory ]; then $s3Cmd cp --no-progress $globDirectory $s3GlobOutDirectory --recursive --exclude "cromwell_glob_control_file"; fi + |$awsCmd s3 cp --no-progress ${output.name} ${output.s3key} + |if [ -e $globDirectory ]; then $awsCmd s3 cp --no-progress $globDirectory $s3GlobOutDirectory --recursive --exclude "cromwell_glob_control_file"; fi |""".stripMargin case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString => //output is on working disk mount s""" - |$s3Cmd cp --no-progress $workDir/${output.local.pathAsString} ${output.s3key} + |$awsCmd s3 cp --no-progress $workDir/${output.local.pathAsString} ${output.s3key} |""".stripMargin case output: AwsBatchFileOutput => //output on a different mount - s"$s3Cmd cp --no-progress ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} ${output.s3key}" + s"$awsCmd s3 cp --no-progress ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} ${output.s3key}" case _ => "" }.mkString("\n") + "\n" + s""" - |if [ -f $workDir/${jobPaths.returnCodeFilename} ]; then $s3Cmd cp --no-progress $workDir/${jobPaths.returnCodeFilename} ${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename} ; fi\n - |if [ -f $stdErr ]; then $s3Cmd cp --no-progress $stdErr ${jobPaths.standardPaths.error.pathAsString}; fi - |if [ -f $stdOut ]; then $s3Cmd cp --no-progress $stdOut ${jobPaths.standardPaths.output.pathAsString}; fi + |if [ -f $workDir/${jobPaths.returnCodeFilename} ]; then $awsCmd s3 cp --no-progress $workDir/${jobPaths.returnCodeFilename} ${jobPaths.callRoot.pathAsString}/${jobPaths.returnCodeFilename} ; fi\n + |if [ -f $stdErr ]; then $awsCmd s3 cp --no-progress $stdErr ${jobPaths.standardPaths.error.pathAsString}; fi + |if [ -f $stdOut ]; then $awsCmd s3 cp --no-progress $stdOut ${jobPaths.standardPaths.output.pathAsString}; fi |""".stripMargin @@ -212,6 +234,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |echo '*** DELOCALIZING OUTPUTS ***' |$outputCopyCommand |echo '*** COMPLETED DELOCALIZATION ***' + |echo '*** EXITING WITH RETURN CODE ***' + |rc=$$(head -n 1 $workDir/${jobPaths.returnCodeFilename}) + |echo $$rc + |exit $$rc |} |""".stripMargin } @@ -221,8 +247,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL } private def generateEnvironmentKVPairs(scriptBucketName: String, scriptKeyPrefix: String, scriptKey: String): List[KeyValuePair] = { - List(buildKVPair(AWS_MAX_ATTEMPTS, AWS_MAX_ATTEMPTS_DEFAULT_VALUE), - buildKVPair(AWS_RETRY_MODE, AWS_RETRY_MODE_DEFAULT_VALUE), + List( buildKVPair("BATCH_FILE_TYPE", "script"), buildKVPair("BATCH_FILE_S3_URL",batch_file_s3_url(scriptBucketName,scriptKeyPrefix,scriptKey))) } @@ -388,16 +413,19 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL // See: // // http://aws-java-sdk-javadoc.s3-website-us-west-2.amazonaws.com/latest/software/amazon/awssdk/services/batch/model/RegisterJobDefinitionRequest.Builder.html - val definitionRequest = RegisterJobDefinitionRequest.builder + var definitionRequest = RegisterJobDefinitionRequest.builder .containerProperties(jobDefinition.containerProperties) .jobDefinitionName(jobDefinitionName) // See https://stackoverflow.com/questions/24349517/scala-method-named-type .`type`(JobDefinitionType.CONTAINER) - .build + + if (jobDefinitionContext.runtimeAttributes.awsBatchRetryAttempts != 0){ + definitionRequest = definitionRequest.retryStrategy(jobDefinition.retryStrategy) + } Log.debug(s"Submitting definition request: $definitionRequest") - val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest) + val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest.build) Log.info(s"Definition created: $response") response.jobDefinitionArn() } @@ -442,7 +470,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL } yield runStatus def detail(jobId: String): JobDetail = { - //TODO: This client call should be wrapped in a cats Effect val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(jobId).build) val jobDetail = describeJobsResponse.jobs.asScala.headOption. diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 137cce9a4..876a23c32 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -35,7 +35,7 @@ import scala.language.postfixOps import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, Volume} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume, Ulimit} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.collection.JavaConverters._ @@ -62,12 +62,14 @@ import wdl4s.parser.MemoryUnit */ sealed trait AwsBatchJobDefinition { def containerProperties: ContainerProperties + def retryStrategy: RetryStrategy def name: String override def toString: String = { new ToStringBuilder(this, ToStringStyle.JSON_STYLE) .append("name", name) .append("containerProperties", containerProperties) + .append("retryStrategy", retryStrategy) .build } } @@ -78,23 +80,13 @@ trait AwsBatchJobDefinitionBuilder { /** Gets a builder, seeded with appropriate portions of the container properties * - * @param dockerImage docker image with which to run - * @return ContainerProperties builder ready for modification + * @param context AwsBatchJobDefinitionContext with all the runtime attributes + * @return ContainerProperties builder ready for modification and name * */ - def builder(dockerImage: String): ContainerProperties.Builder = - ContainerProperties.builder().image(dockerImage) - - - def buildResources(builder: ContainerProperties.Builder, - context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = { - // The initial buffer should only contain one item - the hostpath of the - // local disk mount point, which will be needed by the docker container - // that copies data around - - val environment = List.empty[KeyValuePair] - - + def containerPropertiesBuilder(context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = { + + def buildVolumes(disks: Seq[AwsBatchVolume]): List[Volume] = { //all the configured disks plus the fetch and run volume and the aws-cli volume @@ -111,6 +103,7 @@ trait AwsBatchJobDefinitionBuilder { ) } + def buildMountPoints(disks: Seq[AwsBatchVolume]): List[MountPoint] = { //all the configured disks plus the fetch and run mount point and the AWS cli mount point @@ -130,45 +123,63 @@ trait AwsBatchJobDefinitionBuilder { ) } - def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { - val str = s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" - val sha1 = MessageDigest.getInstance("SHA-1") - .digest( str.getBytes("UTF-8") ) - .map("%02x".format(_)).mkString - - val prefix = s"cromwell_$imageName".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 + def buildUlimits(ulimits: Seq[Map[String, String]]): List[Ulimit] = { - sanitize(prefix + sha1) + ulimits.filter(_.nonEmpty).map(u => + Ulimit.builder() + .name(u("name")) + .softLimit(u("softLimit").toInt) + .hardLimit(u("hardLimit").toInt) + .build() + ).toList } + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit]): String = { + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}" + } + + + val environment = List.empty[KeyValuePair] val cmdName = context.runtimeAttributes.fileSystem match { - case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh" - case _ => context.commandText + case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh" + case _ => context.commandText } val packedCommand = packCommand("/bin/bash", "-c", cmdName) val volumes = buildVolumes( context.runtimeAttributes.disks ) val mountPoints = buildMountPoints( context.runtimeAttributes.disks) - val jobDefinitionName = buildName( + val ulimits = buildUlimits( context.runtimeAttributes.ulimits) + val containerPropsName = buildName( context.runtimeAttributes.dockerImage, packedCommand.mkString(","), volumes, mountPoints, - environment + environment, + ulimits ) - (builder - .command(packedCommand.asJava) - .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) - .vcpus(context.runtimeAttributes.cpu##) - .volumes( volumes.asJava) - .mountPoints( mountPoints.asJava) - .environment(environment.asJava), + (ContainerProperties.builder() + .image(context.runtimeAttributes.dockerImage) + .command(packedCommand.asJava) + .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) + .vcpus(context.runtimeAttributes.cpu##) + .volumes(volumes.asJava) + .mountPoints(mountPoints.asJava) + .environment(environment.asJava) + .ulimits(ulimits.asJava), + containerPropsName) + } - jobDefinitionName) + def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = { + // We can add here the 'evaluateOnExit' statement + + (RetryStrategy.builder() + .attempts(context.runtimeAttributes.awsBatchRetryAttempts), + context.runtimeAttributes.awsBatchRetryAttempts.toString) } + private def packCommand(shell: String, options: String, mainCommand: String): Seq[String] = { val rc = new ListBuffer[String]() val lim = 1024 @@ -189,15 +200,29 @@ trait AwsBatchJobDefinitionBuilder { object StandardAwsBatchJobDefinitionBuilder extends AwsBatchJobDefinitionBuilder { def build(context: AwsBatchJobDefinitionContext): AwsBatchJobDefinition = { - //instantiate a builder with the name of the docker image - val builderInst = builder(context.runtimeAttributes.dockerImage) - val (b, name) = buildResources(builderInst, context) + + val (containerPropsInst, containerPropsName) = containerPropertiesBuilder(context) + val (retryStrategyInst, retryStrategyName) = retryStrategyBuilder(context) - new StandardAwsBatchJobDefinitionBuilder(b.build, name) + val name = buildName(context.runtimeAttributes.dockerImage, containerPropsName, retryStrategyName) + + new StandardAwsBatchJobDefinitionBuilder(containerPropsInst.build, retryStrategyInst.build, name) } + + def buildName(imageName: String, containerPropsName: String, retryStrategyName: String): String = { + val str = s"$imageName:$containerPropsName:$retryStrategyName" + + val sha1 = MessageDigest.getInstance("SHA-1") + .digest( str.getBytes("UTF-8") ) + .map("%02x".format(_)).mkString + + val prefix = s"cromwell_${imageName}_".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 + + sanitize(prefix + sha1) + } } -case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, name: String) extends AwsBatchJobDefinition +case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, retryStrategy: RetryStrategy, name: String) extends AwsBatchJobDefinition object AwsBatchJobDefinitionContext diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index c6fc2a5f5..8296eefd4 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -60,6 +60,8 @@ import scala.util.matching.Regex * @param noAddress is there no address * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed * @param fileSystem the filesystem type, default is "s3" + * @param awsBatchRetryAttempts number of attempts that AWS Batch will retry the task if it fails + * @param ulimits ulimit values to be passed to the container */ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, zones: Vector[String], @@ -71,7 +73,9 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, continueOnReturnCode: ContinueOnReturnCode, noAddress: Boolean, scriptS3BucketName: String, - fileSystem:String= "s3") + awsBatchRetryAttempts: Int, + ulimits: Vector[Map[String, String]], + fileSystem: String= "s3") object AwsBatchRuntimeAttributes { @@ -79,6 +83,8 @@ object AwsBatchRuntimeAttributes { val scriptS3BucketKey = "scriptBucketName" + val awsBatchRetryAttemptsKey = "awsBatchRetryAttempts" + val ZonesKey = "zones" private val ZonesDefaultValue = WomString("us-east-1a") @@ -92,6 +98,9 @@ object AwsBatchRuntimeAttributes { private val MemoryDefaultValue = "2 GB" + val UlimitsKey = "ulimits" + private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) + private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) @@ -134,6 +143,14 @@ object AwsBatchRuntimeAttributes { QueueArnValidation.withDefault(QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse (throw new RuntimeException("queueArn is required"))) + private def awsBatchRetryAttemptsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { + AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault(AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) + .configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) + } + + private def ulimitsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Vector[Map[String, String]]] = + UlimitsValidation.withDefault(UlimitsValidation.configDefaultWomValue(runtimeConfig) getOrElse UlimitsDefaultValue) + def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = { val runtimeConfig = configuration.runtimeConfig def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( @@ -146,7 +163,9 @@ object AwsBatchRuntimeAttributes { noAddressValidation(runtimeConfig), dockerValidation, queueArnValidation(runtimeConfig), - scriptS3BucketNameValidation(runtimeConfig) + scriptS3BucketNameValidation(runtimeConfig), + awsBatchRetryAttemptsValidation(runtimeConfig), + ulimitsValidation(runtimeConfig), ) def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( cpuValidation(runtimeConfig), @@ -181,6 +200,8 @@ object AwsBatchRuntimeAttributes { case AWSBatchStorageSystems.s3 => RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes) case _ => "" } + val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract(awsBatchRetryAttemptsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val ulimits: Vector[Map[String, String]] = RuntimeAttributesValidation.extract(ulimitsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( @@ -194,6 +215,8 @@ object AwsBatchRuntimeAttributes { continueOnReturnCode, noAddress, scriptS3BucketName, + awsBatchRetryAttempts, + ulimits, fileSystem ) } @@ -372,3 +395,94 @@ object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a comma separated String or Array[String]" } + +object AwsBatchRetryAttemptsValidation { + def apply(key: String): AwsBatchRetryAttemptsValidation = new AwsBatchRetryAttemptsValidation(key) +} + +class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesValidation(key) { + override protected def validateValue: PartialFunction[WomValue, ErrorOr[Int]] = { + case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => + WomIntegerType.coerceRawValue(womValue).get match { + case WomInteger(value) => + if (value.toInt < 0) + s"Expecting $key runtime attribute value greater than or equal to 0".invalidNel + else if (value.toInt > 10) + s"Expecting $key runtime attribute value lower than or equal to 10".invalidNel + else + value.toInt.validNel + } + } + + override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Integer" +} + + +object UlimitsValidation + extends RuntimeAttributesValidation[Vector[Map[String, String]]] { + override def key: String = AwsBatchRuntimeAttributes.UlimitsKey + + override def coercion: Traversable[WomType] = + Set(WomStringType, WomArrayType(WomMapType(WomStringType, WomStringType))) + + var accepted_keys = Set("name", "softLimit", "hardLimit") + + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Vector[Map[String, String]]]] = { + case WomArray(womType, value) + if womType.memberType == WomMapType(WomStringType, WomStringType) => + check_maps(value.toVector) + case WomMap(_, _) => "!!! ERROR1".invalidNel + + } + + private def check_maps( + maps: Vector[WomValue] + ): ErrorOr[Vector[Map[String, String]]] = { + val entryNels: Vector[ErrorOr[Map[String, String]]] = maps.map { + case WomMap(_, value) => check_keys(value) + case _ => "!!! ERROR2".invalidNel + } + val sequenced: ErrorOr[Vector[Map[String, String]]] = sequenceNels( + entryNels + ) + sequenced + } + + private def check_keys( + dict: Map[WomValue, WomValue] + ): ErrorOr[Map[String, String]] = { + val map_keys = dict.keySet.map(_.valueString).toSet + val unrecognizedKeys = + accepted_keys.diff(map_keys) union map_keys.diff(accepted_keys) + + if (!dict.nonEmpty){ + Map.empty[String, String].validNel + }else if (unrecognizedKeys.nonEmpty) { + s"Invalid keys in $key runtime attribute. Refer to 'ulimits' section on https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties".invalidNel + } else { + dict + .collect { case (WomString(k), WomString(v)) => + (k, v) + // case _ => "!!! ERROR3".invalidNel + } + .toMap + .validNel + } + } + + private def sequenceNels( + nels: Vector[ErrorOr[Map[String, String]]] + ): ErrorOr[Vector[Map[String, String]]] = { + val emptyNel: ErrorOr[Vector[Map[String, String]]] = + Vector.empty[Map[String, String]].validNel + val seqNel: ErrorOr[Vector[Map[String, String]]] = + nels.foldLeft(emptyNel) { (acc, v) => + (acc, v) mapN { (a, v) => a :+ v } + } + seqNel + } + + override protected def missingValueMessage: String = + s"Expecting $key runtime attribute to be an Array[Map[String, String]]" +} \ No newline at end of file diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index 5037fc210..4a7ea041b 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -113,6 +113,8 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi continueOnReturnCode = ContinueOnReturnCodeFlag(false), noAddress = false, scriptS3BucketName = "script-bucket", + awsBatchRetryAttempts = 1, + ulimits = Vector(Map.empty[String, String]), fileSystem = "s3") private def generateBasicJob: AwsBatchJob = { diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 09e1ee943..f8c009f95 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -65,7 +65,9 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout false, ContinueOnReturnCodeSet(Set(0)), false, - "my-stuff") + "my-stuff", + 1, + Vector(Map.empty[String, String])) val expectedDefaultsLocalFS = new AwsBatchRuntimeAttributes(refineMV[Positive](1), Vector("us-east-1a", "us-east-1b"), @@ -76,6 +78,8 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout ContinueOnReturnCodeSet(Set(0)), false, "", + 1, + Vector(Map.empty[String, String]), "local") "AwsBatchRuntimeAttributes" should { @@ -339,6 +343,33 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout val expectedRuntimeAttributes = expectedDefaults.copy(cpu = refineMV[Positive](4)) assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes, workflowOptions) } + + "validate a valid awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(9), "scriptBucketName" -> WomString("my-stuff")) + val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 9) + assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) + } + + "fail to validate with -1 as awsBatchRetryAttempts" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(-1), "scriptBucketName" -> WomString("my-stuff")) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value greater than or equal to 0") + } + + "fail to validate with 12 as awsBatchRetryAttempts" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(12), "scriptBucketName" -> WomString("my-stuff")) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") + } + + "fail to validate with a string as awsBatchRetryAttempts" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomString("test"), "scriptBucketName" -> WomString("my-stuff")) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute to be an Integer") + } + + "validate zero as awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0), "scriptBucketName" -> WomString("my-stuff")) + val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 0) + assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) + } } private def assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes: Map[String, WomValue], diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala index 38545c7e4..682714b22 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala @@ -61,6 +61,7 @@ object AwsBatchTestConfig { | zones:["us-east-1a", "us-east-1b"] | queueArn: "arn:aws:batch:us-east-1:111222333444:job-queue/job-queue" | scriptBucketName: "my-bucket" + | awsBatchRetryAttempts: 1 |} | |""".stripMargin @@ -140,6 +141,7 @@ object AwsBatchTestConfigForLocalFS { | zones:["us-east-1a", "us-east-1b"] | queueArn: "arn:aws:batch:us-east-1:111222333444:job-queue/job-queue" | scriptBucketName: "" + | awsBatchRetryAttempts: 1 |} | |""".stripMargin diff --git a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala index 0a2bf3b99..21f6f7516 100644 --- a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala +++ b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala @@ -51,7 +51,7 @@ object EngineFunctionEvaluators { EvaluatedValue(WomSingleFile(ioFunctionSet.pathFunctions.stderr), Seq.empty).validNel } - private val ReadWaitTimeout = 60.seconds + private val ReadWaitTimeout = 300.seconds private def readFile(fileToRead: WomSingleFile, ioFunctionSet: IoFunctionSet, sizeLimit: Int) = { Try(Await.result(ioFunctionSet.readFile(fileToRead.value, Option(sizeLimit), failOnOverflow = true), ReadWaitTimeout)) }