/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
namespace Amazon.ElasticMapReduce.Model
{
///
/// Class that makes it easy to define Hadoop Streaming steps.
///
/// See also: Hadoop Streaming
///
///
/// AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
/// IAmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
///
/// HadoopJarStepConfig config = new StreamingStep {
/// Inputs = new List<string> { "s3://elasticmapreduce/samples/wordcount/input" },
/// Output = "s3://my-bucket/output/",
/// Mapper = "s3://elasticmapreduce/samples/wordcount/wordSplitter.py",
/// Reducer = "aggregate"
/// }.ToHadoopJarStepConfig();
///
/// StepConfig wordCount = new StepConfig {
/// Name = "Word Count",
/// ActionOnFailure = "TERMINATE_JOB_FLOW",
/// HadoopJarStep = config
/// };
///
/// RunJobFlowRequest request = new RunJobFlowRequest {
/// Name = "Word Count",
/// Steps = new List<StepConfig> { wordCount },
/// LogUri = "s3://log-bucket/",
/// Instances = new JobFlowInstancesConfig {
/// Ec2KeyName = "keypair",
/// HadoopVersion = "0.20",
/// InstanceCount = 5,
/// KeepJobFlowAliveWhenNoSteps = true,
/// MasterInstanceType = "m1.small",
/// SlaveInstanceType = "m1.small"
/// }
/// };
///
/// RunJobFlowResponse response = emr.RunJobFlow(request);
///
///
public class StreamingStep
{
List inputs = new List();
string output;
string mapper;
string reducer;
Dictionary hadoopConfig = new Dictionary();
///
/// Creates a new default StreamingStep.
///
public StreamingStep()
{
}
///
/// Gets and sets list of step input paths.
///
/// The inputs.
public List Inputs
{
get { return inputs; }
set { this.inputs = value; }
}
///
/// Add more input paths to this step.
///
/// inputs a list of inputs to this step.
public void AddInputs(params string[] inputs)
{
if (this.inputs == null)
{
this.inputs = new List();
}
foreach (string input in inputs)
{
this.inputs.Add(input);
}
}
///
/// Gets and sets the output path.
///
public string Output
{
get { return this.output; }
set { this.output = value; }
}
///
/// Gets and sets the mapper.
///
public string Mapper
{
get { return this.mapper; }
set { this.mapper = value; }
}
///
/// Gets and sets the reducer.
///
public string Reducer
{
get { return reducer; }
set { this.reducer = value; }
}
///
/// Gets and sets the hadoopConfig.
///
public Dictionary HadoopConfig
{
get { return hadoopConfig; }
set { this.hadoopConfig = value; }
}
///
/// Add a Hadoop config override (-D value)
///
/// Hadoop configuration key.
/// Configuration value.
/// A reference to this updated object so that method calls can be chained
/// together.
public void AddHadoopConfig(string key, string value)
{
hadoopConfig[key] = value;
}
///
/// Creates the final HadoopJarStepConfig once you are done configuring the step. You can use
/// this as you would any other HadoopJarStepConfig.
///
/// HadoopJarStepConfig representing this streaming step.
public HadoopJarStepConfig ToHadoopJarStepConfig() {
List args = new List();
if (reducer == null) {
hadoopConfig.Add("mapred.reduce.tasks", "0");
}
foreach (KeyValuePair entry in hadoopConfig)
{
args.Add("-D");
args.Add(string.Format(CultureInfo.InvariantCulture, "{0} = {1}", entry.Key, entry.Value));
}
foreach (string input in inputs) {
args.Add("-input");
args.Add(input);
}
if (output != null) {
args.Add("-output");
args.Add(output);
}
if (mapper != null) {
args.Add("-mapper");
args.Add(mapper);
}
if (reducer != null) {
args.Add("-reducer");
args.Add(reducer);
}
return new HadoopJarStepConfig
{
Jar = "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
Args = args
};
}
}
}