package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.class */
public class SequenceFileRecordWriterProvider implements RecordWriterProvider {
    public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
    public static final String DEFAULT_RECORD_DELIMITER = "";
    private static Logger log = Logger.getLogger(SequenceFileRecordWriterProvider.class);
    protected String recordDelimiter = null;

    public SequenceFileRecordWriterProvider(TaskAttemptContext taskAttemptContext) {
    }

    public String getFilenameExtension() {
        return DEFAULT_RECORD_DELIMITER;
    }

    public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext taskAttemptContext, String str, CamusWrapper camusWrapper, FileOutputCommitter fileOutputCommitter) throws IOException, InterruptedException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        if (this.recordDelimiter == null) {
            this.recordDelimiter = configuration.get("etl.output.record.delimiter", DEFAULT_RECORD_DELIMITER);
        }
        CompressionCodec compressionCodec = null;
        SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.NONE;
        if (SequenceFileOutputFormat.getCompressOutput(taskAttemptContext)) {
            compressionType = SequenceFileOutputFormat.getOutputCompressionType(taskAttemptContext);
            compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(SequenceFileOutputFormat.getOutputCompressorClass(taskAttemptContext, DefaultCodec.class), configuration);
        }
        Path path = new Path(fileOutputCommitter.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(taskAttemptContext, str, getFilenameExtension()));
        log.info("Creating new SequenceFile.Writer with compression type " + compressionType + " and compression codec " + (compressionCodec != null ? compressionCodec.getClass().getName() : "null"));
        final SequenceFile.Writer createWriter = SequenceFile.createWriter(path.getFileSystem(configuration), configuration, path, LongWritable.class, Text.class, compressionType, compressionCodec, taskAttemptContext);
        return new RecordWriter<IEtlKey, CamusWrapper>() { // from class: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider.1
            public void write(IEtlKey iEtlKey, CamusWrapper camusWrapper2) throws IOException, InterruptedException {
                createWriter.append(new LongWritable(iEtlKey.getTime()), new Text(((String) camusWrapper2.getRecord()) + SequenceFileRecordWriterProvider.this.recordDelimiter));
            }

            public void close(TaskAttemptContext taskAttemptContext2) throws IOException, InterruptedException {
                createWriter.close();
            }
        };
    }
}
