用户可根据基本inputformat及recordreader代码修改得到自定义inputformat,此处代码提供Key为空,Value为整个文件内容的inputformat及recordreader,key与value间默认间隔符可以在执行时通过不同参数调整
修改好两个class文件后,依后文编译替换默认streaming.jar
 
WholeFileInputFormat
package org.apache.hadoop.streaming;
import java.io.IOException;
import org.apache.hadoop.fs.;
import org.apache.hadoop.io.;
import org.apache.hadoop.mapred.*;
// vv WholeFileInputFormat
public class WholeFileInputFormat
    extends FileInputFormat<Text, BytesWritable> {
  @Override
  protected boolean isSplitable(FileSystem fs, Path filename) {
    return false;
  }
  @Override
  public RecordReader<Text, BytesWritable> getRecordReader(
      InputSplit split, JobConf job, Reporter reporter) throws IOException {
    return new WholeFileRecordReader((FileSplit) split, job);
  }
}
// ^^ WholeFileInputFormat
package org.apache.hadoop.streaming;
// cc WholeFileRecordReader The RecordReader used by WholeFileInputFormat for reading a whole file as a record
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.;
import org.apache.hadoop.io.;
import org.apache.hadoop.mapred.*;
// vv WholeFileRecordReader
class WholeFileRecordReader implements RecordReader<Text, BytesWritable> {
  private FileSplit fileSplit;
  private Configuration conf;
  private boolean processed = false;
  public WholeFileRecordReader(FileSplit fileSplit, Configuration conf)
      throws IOException {
    this.fileSplit = fileSplit;
    this.conf = conf;
  }
  @Override
  public Text createKey() {
    return new Text();
  }
  @Override
  public BytesWritable createValue() {
    return new BytesWritable();
  }
  @Override
  public long getPos() throws IOException {
    return processed ? fileSplit.getLength() : 0;
  }
  @Override
  public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
  }
  @Override
  public boolean next(Text key, BytesWritable value) throws IOException {
    if (!processed) {
      byte[] contents = new byte[(int) fileSplit.getLength()];
      Path file = fileSplit.getPath();
      FileSystem fs = file.getFileSystem(conf);
      FSDataInputStream in = null;
//      key.set(‘>’+file.toString());
      try {
        in = fs.open(file);
        IOUtils.readFully(in, contents, 0, contents.length);
        value.set(contents, 0, contents.length);
      } finally {
        IOUtils.closeStream(in);
      }
      processed = true;
      return true;
    }
    return false;
  }
  @Override
  public void close() throws IOException {
    // do nothing
  }
}
// ^^ WholeFileRecordReader
 
#download hadoop-1.0.0
#download commons-logging-1.1.1.jar
mkdir /home/ubuntu/hadoop-1.0.0/contrib/streaming/new_test
cd /home/ubuntu/hadoop-1.0.0/contrib/streaming
#unpack the jar and a directory org is created
jar xvf hadoop-streaming-1.0.0.jar
#put the custom Inputformat and RecordReader (FastaInputFormat.java and FastaRecordReader.java) in org/apache/hadoop/streaming/ and compile
javac -classpath /home/ubuntu/hadoop-1.0.0/hadoop-core-1.0.0.jar ./org/apache/hadoop/streaming/ *.java
#create a jar
jar cvfm hadoop-streaming-1.0.3.jar /usr/local/hadoop/contrib/streaming/new/META-INF/MANIFEST.MF -C /usr/local/hadoop/contrib/streaming/new/ .
#replace the original jar
rm ../hadoop-streaming-1.0.0.jar -f
cp hadoop-streaming-1.0.0.jar ../ –f