package com.linkedin.camus.etl.kafka.mapred;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.KafkaReader;
import java.io.IOException;
import java.util.HashSet;
import kafka.message.Message;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.class */
public class EtlRecordReader extends RecordReader<EtlKey, CamusWrapper> {
    private static final String PRINT_MAX_DECODER_EXCEPTIONS = "max.decoder.exceptions.to.print";
    private static final String DEFAULT_SERVER = "server";
    private static final String DEFAULT_SERVICE = "service";
    private TaskAttemptContext context;
    private Mapper<EtlKey, Writable, EtlKey, Writable>.Context mapperContext;
    private KafkaReader reader;
    private long totalBytes;
    private MessageDecoder decoder;
    private CamusWrapper value;
    EtlSplit split;
    private static Logger log = Logger.getLogger(EtlRecordReader.class);
    private long readBytes = 0;
    private boolean skipSchemaErrors = false;
    private final BytesWritable msgValue = new BytesWritable();
    private final BytesWritable msgKey = new BytesWritable();
    private final EtlKey key = new EtlKey();
    private int maxPullHours = 0;
    private int exceptionCount = 0;
    private long maxPullTime = 0;
    private long beginTimeStamp = 0;
    private long endTimeStamp = 0;
    private HashSet<String> ignoreServerServiceList = null;
    private String statusMsg = "";

    public EtlRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        initialize(inputSplit, taskAttemptContext);
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.split = (EtlSplit) inputSplit;
        this.context = taskAttemptContext;
        if (taskAttemptContext instanceof Mapper.Context) {
            this.mapperContext = (Mapper.Context) taskAttemptContext;
        }
        this.skipSchemaErrors = EtlInputFormat.getEtlIgnoreSchemaErrors(taskAttemptContext);
        if (EtlInputFormat.getKafkaMaxPullHrs(taskAttemptContext) != -1) {
            this.maxPullHours = EtlInputFormat.getKafkaMaxPullHrs(taskAttemptContext);
        } else {
            this.endTimeStamp = Long.MAX_VALUE;
        }
        if (EtlInputFormat.getKafkaMaxPullMinutesPerTask(taskAttemptContext) != -1) {
            this.maxPullTime = new DateTime().plusMinutes(EtlInputFormat.getKafkaMaxPullMinutesPerTask(taskAttemptContext)).getMillis();
        } else {
            this.maxPullTime = Long.MAX_VALUE;
        }
        if (EtlInputFormat.getKafkaMaxHistoricalDays(taskAttemptContext) != -1) {
            this.beginTimeStamp = new DateTime().minusDays(EtlInputFormat.getKafkaMaxHistoricalDays(taskAttemptContext)).getMillis();
        } else {
            this.beginTimeStamp = 0L;
        }
        this.ignoreServerServiceList = new HashSet<>();
        for (String str : EtlInputFormat.getEtlAuditIgnoreServiceTopicList(taskAttemptContext)) {
            this.ignoreServerServiceList.add(str);
        }
        this.totalBytes = this.split.getLength();
    }

    public synchronized void close() throws IOException {
        if (this.reader != null) {
            this.reader.close();
        }
    }

    private CamusWrapper getWrappedRecord(String str, byte[] bArr) throws IOException {
        CamusWrapper camusWrapper = null;
        try {
            camusWrapper = this.decoder.decode(bArr);
        } catch (Exception e) {
            if (!this.skipSchemaErrors) {
                throw new IOException(e);
            }
        }
        return camusWrapper;
    }

    private static byte[] getBytes(BytesWritable bytesWritable) {
        byte[] bytes = bytesWritable.getBytes();
        long length = bytesWritable.getLength();
        byte[] bArr = bytes;
        if (length < bytes.length) {
            bArr = new byte[(int) length];
            System.arraycopy(bytes, 0, bArr, 0, (int) length);
        }
        return bArr;
    }

    public float getProgress() throws IOException {
        if (getPos() == 0) {
            return 0.0f;
        }
        if (getPos() >= this.totalBytes) {
            return 1.0f;
        }
        return (float) (getPos() / this.totalBytes);
    }

    private long getPos() throws IOException {
        return this.reader != null ? this.readBytes + this.reader.getReadBytes() : this.readBytes;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public EtlKey m13getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public CamusWrapper m12getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (System.currentTimeMillis() > this.maxPullTime) {
            log.info("Max pull time reached");
            if (this.reader == null) {
                return false;
            }
            closeReader();
            return false;
        }
        while (true) {
            try {
                if (this.reader == null || !this.reader.hasNext()) {
                    EtlRequest popRequest = this.split.popRequest();
                    if (popRequest == null) {
                        return false;
                    }
                    if (this.maxPullHours > 0) {
                        this.endTimeStamp = 0L;
                    }
                    this.key.set(popRequest.getTopic(), popRequest.getLeaderId(), popRequest.getPartition(), popRequest.getOffset(), popRequest.getOffset(), 0L);
                    this.value = null;
                    log.info("\n\ntopic:" + popRequest.getTopic() + " partition:" + popRequest.getPartition() + " beginOffset:" + popRequest.getOffset() + " estimatedLastOffset:" + popRequest.getLastOffset());
                    this.statusMsg += (this.statusMsg.length() > 0 ? "; " : "");
                    this.statusMsg += popRequest.getTopic() + ":" + popRequest.getLeaderId() + ":" + popRequest.getPartition();
                    this.context.setStatus(this.statusMsg);
                    if (this.reader != null) {
                        closeReader();
                    }
                    this.reader = new KafkaReader(this.context, popRequest, CamusJob.getKafkaTimeoutValue(this.mapperContext), CamusJob.getKafkaBufferSize(this.mapperContext));
                    this.decoder = MessageDecoderFactory.createMessageDecoder(this.context, popRequest.getTopic());
                }
                int i = 0;
                while (this.reader.getNext(this.key, this.msgValue, this.msgKey)) {
                    i++;
                    this.context.progress();
                    this.mapperContext.getCounter("total", "data-read").increment(this.msgValue.getLength());
                    this.mapperContext.getCounter("total", "event-count").increment(1L);
                    byte[] bytes = getBytes(this.msgValue);
                    Message message = new Message(bytes, getBytes(this.msgKey));
                    Message message2 = new Message(bytes);
                    long checksum = this.key.getChecksum();
                    if (checksum != message.checksum() && checksum != message2.checksum()) {
                        throw new ChecksumException("Invalid message checksum : MessageWithKey : " + message.checksum() + " MessageWithoutKey checksum : " + message2.checksum() + ". Expected " + this.key.getChecksum(), this.key.getOffset());
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        CamusWrapper wrappedRecord = getWrappedRecord(this.key.getTopic(), bytes);
                        if (wrappedRecord == null) {
                            this.mapperContext.write(this.key, new ExceptionWritable(new RuntimeException("null record")));
                        } else {
                            long timestamp = wrappedRecord.getTimestamp();
                            try {
                                this.key.setTime(timestamp);
                                this.key.setPartition(wrappedRecord.getPartitionMap());
                                setServerService();
                                if (timestamp < this.beginTimeStamp) {
                                    this.mapperContext.getCounter("total", "skip-old").increment(1L);
                                } else if (this.endTimeStamp == 0) {
                                    DateTime dateTime = new DateTime(timestamp);
                                    this.statusMsg += " begin read at " + dateTime.toString();
                                    this.context.setStatus(this.statusMsg);
                                    log.info(this.key.getTopic() + " begin read at " + dateTime.toString());
                                    this.endTimeStamp = dateTime.plusHours(this.maxPullHours).getMillis();
                                } else if (timestamp > this.endTimeStamp || System.currentTimeMillis() > this.maxPullTime) {
                                    if (timestamp > this.endTimeStamp) {
                                        log.info("Kafka Max history hours reached");
                                    }
                                    if (System.currentTimeMillis() > this.maxPullTime) {
                                        log.info("Kafka pull time limit reached");
                                    }
                                    this.statusMsg += " max read at " + new DateTime(timestamp).toString();
                                    this.context.setStatus(this.statusMsg);
                                    log.info(this.key.getTopic() + " max read at " + new DateTime(timestamp).toString());
                                    this.mapperContext.getCounter("total", "request-time(ms)").increment(this.reader.getFetchTime());
                                    closeReader();
                                }
                                long currentTimeMillis2 = System.currentTimeMillis();
                                this.value = wrappedRecord;
                                this.mapperContext.getCounter("total", "decode-time(ms)").increment(currentTimeMillis2 - currentTimeMillis);
                                if (this.reader == null) {
                                    return true;
                                }
                                this.mapperContext.getCounter("total", "request-time(ms)").increment(this.reader.getFetchTime());
                                return true;
                            } catch (Exception e) {
                                this.mapperContext.write(this.key, new ExceptionWritable(e));
                            }
                        }
                    } catch (Exception e2) {
                        if (this.exceptionCount < getMaximumDecoderExceptionsToPrint(this.context)) {
                            this.mapperContext.write(this.key, new ExceptionWritable(e2));
                            this.exceptionCount++;
                        } else if (this.exceptionCount == getMaximumDecoderExceptionsToPrint(this.context)) {
                            this.exceptionCount = Integer.MAX_VALUE;
                            log.info("The same exception has occured for more than " + getMaximumDecoderExceptionsToPrint(this.context) + " records. All further exceptions will not be printed");
                        }
                    }
                }
                log.info("Records read : " + i);
                this.reader = null;
            } catch (Throwable th) {
                Exception exc = new Exception(th.getLocalizedMessage(), th);
                exc.setStackTrace(th.getStackTrace());
                this.mapperContext.write(this.key, new ExceptionWritable(exc));
                this.reader = null;
            }
        }
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            try {
                this.readBytes += this.reader.getReadBytes();
                this.reader.close();
                this.reader = null;
            } catch (Exception e) {
                this.reader = null;
            } catch (Throwable th) {
                this.reader = null;
                throw th;
            }
        }
    }

    public void setServerService() {
        if (this.ignoreServerServiceList.contains(this.key.getTopic()) || this.ignoreServerServiceList.contains("all")) {
            this.key.setServer(DEFAULT_SERVER);
            this.key.setService(DEFAULT_SERVICE);
        }
    }

    public static int getMaximumDecoderExceptionsToPrint(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(PRINT_MAX_DECODER_EXCEPTIONS, 10);
    }
}
