package com.linkedin.camus.etl.kafka.mapred;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.joda.time.DateTime;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.class */
public class EtlMultiOutputRecordWriter extends RecordWriter<EtlKey, Object> {
    private TaskAttemptContext context;
    private SequenceFile.Writer errorWriter;
    private long beginTimeStamp;
    private EtlMultiOutputCommitter committer;
    private String currentTopic = "";
    private HashMap<String, RecordWriter<IEtlKey, CamusWrapper>> dataWriters = new HashMap<>();

    public EtlMultiOutputRecordWriter(TaskAttemptContext taskAttemptContext, EtlMultiOutputCommitter etlMultiOutputCommitter) throws IOException, InterruptedException {
        this.errorWriter = null;
        this.beginTimeStamp = 0L;
        this.context = taskAttemptContext;
        this.committer = etlMultiOutputCommitter;
        this.errorWriter = SequenceFile.createWriter(FileSystem.get(taskAttemptContext.getConfiguration()), taskAttemptContext.getConfiguration(), new Path(etlMultiOutputCommitter.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(taskAttemptContext, EtlMultiOutputFormat.ERRORS_PREFIX, "")), EtlKey.class, ExceptionWritable.class);
        if (EtlInputFormat.getKafkaMaxHistoricalDays(taskAttemptContext) == -1) {
            this.beginTimeStamp = 0L;
        } else {
            this.beginTimeStamp = new DateTime().minusDays(EtlInputFormat.getKafkaMaxHistoricalDays(taskAttemptContext)).getMillis();
        }
    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Iterator<String> it = this.dataWriters.keySet().iterator();
        while (it.hasNext()) {
            this.dataWriters.get(it.next()).close(taskAttemptContext);
        }
        this.errorWriter.close();
    }

    public void write(EtlKey etlKey, Object obj) throws IOException, InterruptedException {
        if (!(obj instanceof CamusWrapper)) {
            if (obj instanceof ExceptionWritable) {
                this.committer.addOffset(etlKey);
                System.err.println(etlKey.toString());
                System.err.println(obj.toString());
                this.errorWriter.append(etlKey, (ExceptionWritable) obj);
                return;
            }
            return;
        }
        if (etlKey.getTime() < this.beginTimeStamp) {
            this.committer.addOffset(etlKey);
            return;
        }
        if (!etlKey.getTopic().equals(this.currentTopic)) {
            Iterator<RecordWriter<IEtlKey, CamusWrapper>> it = this.dataWriters.values().iterator();
            while (it.hasNext()) {
                it.next().close(this.context);
            }
            this.dataWriters.clear();
            this.currentTopic = etlKey.getTopic();
        }
        this.committer.addCounts(etlKey);
        CamusWrapper camusWrapper = (CamusWrapper) obj;
        String workingFileName = EtlMultiOutputFormat.getWorkingFileName(this.context, etlKey);
        if (!this.dataWriters.containsKey(workingFileName)) {
            this.dataWriters.put(workingFileName, getDataRecordWriter(this.context, workingFileName, camusWrapper));
        }
        this.dataWriters.get(workingFileName).write(etlKey, camusWrapper);
    }

    private RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext taskAttemptContext, String str, CamusWrapper camusWrapper) throws IOException, InterruptedException {
        try {
            return EtlMultiOutputFormat.getRecordWriterProviderClass(taskAttemptContext).newInstance().getDataRecordWriter(taskAttemptContext, str, camusWrapper, this.committer);
        } catch (IllegalAccessException e) {
            throw new IllegalStateException(e);
        } catch (InstantiationException e2) {
            throw new IllegalStateException(e2);
        }
    }
}
