/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.Globalization; using System.Text; namespace Amazon.ElasticMapReduce.Model { /// /// Class that makes it easy to define Hadoop Streaming steps. /// /// See also: Hadoop Streaming /// /// /// AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); /// IAmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials); /// /// HadoopJarStepConfig config = new StreamingStep { /// Inputs = new List<string> { "s3://elasticmapreduce/samples/wordcount/input" }, /// Output = "s3://my-bucket/output/", /// Mapper = "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", /// Reducer = "aggregate" /// }.ToHadoopJarStepConfig(); /// /// StepConfig wordCount = new StepConfig { /// Name = "Word Count", /// ActionOnFailure = "TERMINATE_JOB_FLOW", /// HadoopJarStep = config /// }; /// /// RunJobFlowRequest request = new RunJobFlowRequest { /// Name = "Word Count", /// Steps = new List<StepConfig> { wordCount }, /// LogUri = "s3://log-bucket/", /// Instances = new JobFlowInstancesConfig { /// Ec2KeyName = "keypair", /// HadoopVersion = "0.20", /// InstanceCount = 5, /// KeepJobFlowAliveWhenNoSteps = true, /// MasterInstanceType = "m1.small", /// SlaveInstanceType = "m1.small" /// } /// }; /// /// RunJobFlowResponse response = emr.RunJobFlow(request); /// /// public class StreamingStep { List inputs = new List(); string output; string mapper; string reducer; Dictionary hadoopConfig = new Dictionary(); /// /// Creates a new default StreamingStep. /// public StreamingStep() { } /// /// Gets and sets list of step input paths. /// /// The inputs. public List Inputs { get { return inputs; } set { this.inputs = value; } } /// /// Add more input paths to this step. /// /// inputs a list of inputs to this step. public void AddInputs(params string[] inputs) { if (this.inputs == null) { this.inputs = new List(); } foreach (string input in inputs) { this.inputs.Add(input); } } /// /// Gets and sets the output path. /// public string Output { get { return this.output; } set { this.output = value; } } /// /// Gets and sets the mapper. /// public string Mapper { get { return this.mapper; } set { this.mapper = value; } } /// /// Gets and sets the reducer. /// public string Reducer { get { return reducer; } set { this.reducer = value; } } /// /// Gets and sets the hadoopConfig. /// public Dictionary HadoopConfig { get { return hadoopConfig; } set { this.hadoopConfig = value; } } /// /// Add a Hadoop config override (-D value) /// /// Hadoop configuration key. /// Configuration value. /// A reference to this updated object so that method calls can be chained /// together. public void AddHadoopConfig(string key, string value) { hadoopConfig[key] = value; } /// /// Creates the final HadoopJarStepConfig once you are done configuring the step. You can use /// this as you would any other HadoopJarStepConfig. /// /// HadoopJarStepConfig representing this streaming step. public HadoopJarStepConfig ToHadoopJarStepConfig() { List args = new List(); if (reducer == null) { hadoopConfig.Add("mapred.reduce.tasks", "0"); } foreach (KeyValuePair entry in hadoopConfig) { args.Add("-D"); args.Add(string.Format(CultureInfo.InvariantCulture, "{0} = {1}", entry.Key, entry.Value)); } foreach (string input in inputs) { args.Add("-input"); args.Add(input); } if (output != null) { args.Add("-output"); args.Add(output); } if (mapper != null) { args.Add("-mapper"); args.Add(mapper); } if (reducer != null) { args.Add("-reducer"); args.Add(reducer); } return new HadoopJarStepConfig { Jar = "/home/hadoop/contrib/streaming/hadoop-streaming.jar", Args = args }; } } }