package com.linkedin.camus.etl.kafka.mapred;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.Message;
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.KafkaMessage;
import com.linkedin.camus.etl.kafka.common.KafkaReader;
import com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider;
import com.linkedin.camus.schemaregistry.SchemaNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.class */
public class EtlRecordReader extends RecordReader<EtlKey, CamusWrapper> {
    private static final String PRINT_MAX_DECODER_EXCEPTIONS = "max.decoder.exceptions.to.print";
    private static final String DEFAULT_SERVER = "server";
    private static final String DEFAULT_SERVICE = "service";
    private static final int RECORDS_TO_READ_AFTER_TIMEOUT = 5;
    protected TaskAttemptContext context;
    private EtlInputFormat inputFormat;
    private Mapper<EtlKey, Writable, EtlKey, Writable>.Context mapperContext;
    private KafkaReader reader;
    private long totalBytes;
    private MessageDecoder decoder;
    private CamusWrapper value;
    EtlSplit split;
    private static Logger log = Logger.getLogger(EtlRecordReader.class);
    private long readBytes = 0;
    private int numRecordsReadForCurrentPartition = 0;
    private long bytesReadForCurrentPartition = 0;
    private boolean skipSchemaErrors = false;
    private final BytesWritable msgValue = new BytesWritable();
    private final BytesWritable msgKey = new BytesWritable();
    private final EtlKey key = new EtlKey();
    private int maxPullHours = 0;
    private int exceptionCount = 0;
    private long maxPullTime = 0;
    private long endTimeStamp = 0;
    private long curTimeStamp = 0;
    private long startTime = 0;
    private HashSet<String> ignoreServerServiceList = null;
    private PeriodFormatter periodFormatter = null;
    private String statusMsg = SequenceFileRecordWriterProvider.DEFAULT_RECORD_DELIMITER;

    /* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlRecordReader$KAFKA_MSG.class */
    public enum KAFKA_MSG {
        DECODE_SUCCESSFUL,
        SKIPPED_SCHEMA_NOT_FOUND,
        SKIPPED_OTHER
    }

    public EtlRecordReader(EtlInputFormat etlInputFormat, InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.inputFormat = etlInputFormat;
        initialize(inputSplit, taskAttemptContext);
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        log.info("classpath: " + System.getProperty("java.class.path"));
        ClassLoader classLoader = EtlRecordReader.class.getClassLoader();
        log.info("PWD: " + System.getProperty("user.dir"));
        log.info("classloader: " + classLoader.getClass());
        log.info("org.apache.avro.Schema: " + classLoader.getResource("org/apache/avro/Schema.class"));
        this.split = (EtlSplit) inputSplit;
        this.context = taskAttemptContext;
        if (taskAttemptContext instanceof Mapper.Context) {
            this.mapperContext = (Mapper.Context) taskAttemptContext;
        }
        this.skipSchemaErrors = EtlInputFormat.getEtlIgnoreSchemaErrors(taskAttemptContext);
        if (EtlInputFormat.getKafkaMaxPullHrs(taskAttemptContext) != -1) {
            this.maxPullHours = EtlInputFormat.getKafkaMaxPullHrs(taskAttemptContext);
        } else {
            this.endTimeStamp = Long.MAX_VALUE;
        }
        if (EtlInputFormat.getKafkaMaxPullMinutesPerTask(taskAttemptContext) != -1) {
            this.startTime = System.currentTimeMillis();
            this.maxPullTime = new DateTime(this.startTime).plusMinutes(EtlInputFormat.getKafkaMaxPullMinutesPerTask(taskAttemptContext)).getMillis();
        } else {
            this.maxPullTime = Long.MAX_VALUE;
        }
        this.ignoreServerServiceList = new HashSet<>();
        for (String str : EtlInputFormat.getEtlAuditIgnoreServiceTopicList(taskAttemptContext)) {
            this.ignoreServerServiceList.add(str);
        }
        this.totalBytes = this.split.getLength();
        this.periodFormatter = new PeriodFormatterBuilder().appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s").toFormatter();
    }

    public synchronized void close() throws IOException {
        if (this.reader != null) {
            this.reader.close();
        }
    }

    private CamusWrapper getWrappedRecord(Message message) throws IOException {
        CamusWrapper camusWrapper = null;
        try {
            camusWrapper = this.decoder.decode(message);
            this.mapperContext.getCounter(KAFKA_MSG.DECODE_SUCCESSFUL).increment(1L);
        } catch (SchemaNotFoundException e) {
            this.mapperContext.getCounter(KAFKA_MSG.SKIPPED_SCHEMA_NOT_FOUND).increment(1L);
            if (!this.skipSchemaErrors) {
                throw new IOException((Throwable) e);
            }
        } catch (Exception e2) {
            this.mapperContext.getCounter(KAFKA_MSG.SKIPPED_OTHER).increment(1L);
            if (!this.skipSchemaErrors) {
                throw new IOException(e2);
            }
        }
        return camusWrapper;
    }

    private static byte[] getBytes(BytesWritable bytesWritable) {
        byte[] bytes = bytesWritable.getBytes();
        long length = bytesWritable.getLength();
        byte[] bArr = bytes;
        if (length < bytes.length) {
            bArr = new byte[(int) length];
            System.arraycopy(bytes, 0, bArr, 0, (int) length);
        }
        return bArr;
    }

    public float getProgress() throws IOException {
        if (getPos() == 0) {
            return 0.0f;
        }
        if (getPos() >= this.totalBytes) {
            return 1.0f;
        }
        return (float) (getPos() / this.totalBytes);
    }

    private long getPos() throws IOException {
        return this.readBytes;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public EtlKey m15getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public CamusWrapper m14getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        int i;
        if (System.currentTimeMillis() > this.maxPullTime && this.numRecordsReadForCurrentPartition >= RECORDS_TO_READ_AFTER_TIMEOUT) {
            String str = "at " + new DateTime(this.curTimeStamp).toString();
            log.info("Kafka pull time limit reached");
            this.statusMsg += " max read " + str;
            this.context.setStatus(this.statusMsg);
            log.info(this.key.getTopic() + " max read " + str);
            this.mapperContext.getCounter("total", "request-time(ms)").increment(this.reader.getFetchTime());
            closeReader();
            String format = String.format("Topic %s:%d not fully pulled, max task time reached %s, pulled %d records", this.key.getTopic(), Integer.valueOf(this.key.getPartition()), str, Integer.valueOf(this.numRecordsReadForCurrentPartition));
            this.mapperContext.write(this.key, new ExceptionWritable(format));
            log.warn(format);
            String format2 = String.format("Time spent on topic %s:%d = %s", this.key.getTopic(), Integer.valueOf(this.key.getPartition()), this.periodFormatter.print(new Duration(this.startTime, System.currentTimeMillis()).toPeriod()));
            this.mapperContext.write(this.key, new ExceptionWritable(format2));
            log.info(format2);
            this.reader = null;
        }
        loop0: while (true) {
            try {
                if (this.reader == null || !this.reader.hasNext()) {
                    if (this.numRecordsReadForCurrentPartition != 0) {
                        log.info("Time spent on this partition = " + this.periodFormatter.print(new Duration(this.startTime, System.currentTimeMillis()).toPeriod()));
                        log.info("Num of records read for this partition = " + this.numRecordsReadForCurrentPartition);
                        log.info("Bytes read for this partition = " + this.bytesReadForCurrentPartition);
                        log.info("Actual avg size for this partition = " + (this.bytesReadForCurrentPartition / this.numRecordsReadForCurrentPartition));
                    }
                    EtlRequest etlRequest = (EtlRequest) this.split.popRequest();
                    if (etlRequest == null) {
                        return false;
                    }
                    this.startTime = System.currentTimeMillis();
                    this.numRecordsReadForCurrentPartition = 0;
                    this.bytesReadForCurrentPartition = 0L;
                    if (this.maxPullHours > 0) {
                        this.endTimeStamp = 0L;
                    }
                    this.key.set(etlRequest.getTopic(), etlRequest.getLeaderId(), etlRequest.getPartition(), etlRequest.getOffset(), etlRequest.getOffset(), 0L);
                    this.value = null;
                    log.info("\n\ntopic:" + etlRequest.getTopic() + " partition:" + etlRequest.getPartition() + " beginOffset:" + etlRequest.getOffset() + " estimatedLastOffset:" + etlRequest.getLastOffset());
                    this.statusMsg += (this.statusMsg.length() > 0 ? "; " : SequenceFileRecordWriterProvider.DEFAULT_RECORD_DELIMITER);
                    this.statusMsg += etlRequest.getTopic() + ":" + etlRequest.getLeaderId() + ":" + etlRequest.getPartition();
                    this.context.setStatus(this.statusMsg);
                    if (this.reader != null) {
                        closeReader();
                    }
                    this.reader = new KafkaReader(this.inputFormat, this.context, etlRequest, CamusJob.getKafkaTimeoutValue(this.mapperContext), CamusJob.getKafkaBufferSize(this.mapperContext));
                    this.decoder = createDecoder(etlRequest.getTopic());
                }
                i = 0;
            } catch (Throwable th) {
                Exception exc = new Exception(th.getLocalizedMessage(), th);
                exc.setStackTrace(th.getStackTrace());
                this.mapperContext.write(this.key, new ExceptionWritable(exc));
                this.reader = null;
            }
            while (true) {
                KafkaMessage next = this.reader.getNext(this.key);
                if (next == null) {
                    break;
                }
                this.readBytes += this.key.getMessageSize();
                i++;
                this.numRecordsReadForCurrentPartition++;
                this.bytesReadForCurrentPartition += this.key.getMessageSize();
                this.context.progress();
                this.mapperContext.getCounter("total", "data-read").increment(next.getPayload().length);
                this.mapperContext.getCounter("total", "event-count").increment(1L);
                next.validate();
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    CamusWrapper wrappedRecord = getWrappedRecord(next);
                    if (wrappedRecord == null) {
                        throw new RuntimeException("null record");
                        break loop0;
                    }
                    this.curTimeStamp = wrappedRecord.getTimestamp();
                    try {
                        this.key.setTime(this.curTimeStamp);
                        this.key.addAllPartitionMap(wrappedRecord.getPartitionMap());
                        setServerService();
                        if (this.endTimeStamp == 0) {
                            DateTime dateTime = new DateTime(this.curTimeStamp);
                            this.statusMsg += " begin read at " + dateTime.toString();
                            this.context.setStatus(this.statusMsg);
                            log.info(this.key.getTopic() + " begin read at " + dateTime.toString());
                            this.endTimeStamp = dateTime.plusHours(this.maxPullHours).getMillis();
                        } else if (this.curTimeStamp > this.endTimeStamp) {
                            String str2 = "at " + new DateTime(this.curTimeStamp).toString();
                            log.info("Kafka Max history hours reached");
                            this.mapperContext.write(this.key, new ExceptionWritable(String.format("Topic not fully pulled, max task time reached %s, pulled %d records", str2, Integer.valueOf(this.numRecordsReadForCurrentPartition))));
                            this.statusMsg += " max read " + str2;
                            this.context.setStatus(this.statusMsg);
                            log.info(this.key.getTopic() + " max read " + str2);
                            this.mapperContext.getCounter("total", "request-time(ms)").increment(this.reader.getFetchTime());
                            closeReader();
                        }
                        long currentTimeMillis2 = System.currentTimeMillis();
                        this.value = wrappedRecord;
                        this.mapperContext.getCounter("total", "decode-time(ms)").increment(currentTimeMillis2 - currentTimeMillis);
                        if (this.reader == null) {
                            return true;
                        }
                        this.mapperContext.getCounter("total", "request-time(ms)").increment(this.reader.getFetchTime());
                        return true;
                    } catch (Exception e) {
                        this.mapperContext.write(this.key, new ExceptionWritable(e));
                    }
                } catch (Exception e2) {
                    if (this.exceptionCount < getMaximumDecoderExceptionsToPrint(this.context)) {
                        this.mapperContext.write(this.key, new ExceptionWritable(e2));
                        log.info(e2.getMessage());
                        this.exceptionCount++;
                    } else if (this.exceptionCount == getMaximumDecoderExceptionsToPrint(this.context)) {
                        log.info("The same exception has occured for more than " + getMaximumDecoderExceptionsToPrint(this.context) + " records. All further exceptions will not be printed");
                    }
                    if (System.currentTimeMillis() > this.maxPullTime) {
                        this.exceptionCount = 0;
                        log.info("Records read : " + i);
                        this.reader = null;
                    }
                }
                Exception exc2 = new Exception(th.getLocalizedMessage(), th);
                exc2.setStackTrace(th.getStackTrace());
                this.mapperContext.write(this.key, new ExceptionWritable(exc2));
                this.reader = null;
            }
        }
    }

    protected MessageDecoder createDecoder(String str) {
        return MessageDecoderFactory.createMessageDecoder(this.context, str);
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            try {
                this.reader.close();
                this.reader = null;
            } catch (Exception e) {
                this.reader = null;
            } catch (Throwable th) {
                this.reader = null;
                throw th;
            }
        }
    }

    public void setServerService() {
        if (this.ignoreServerServiceList.contains(this.key.getTopic()) || this.ignoreServerServiceList.contains("all")) {
            this.key.setServer(DEFAULT_SERVER);
            this.key.setService(DEFAULT_SERVICE);
        }
    }

    public static int getMaximumDecoderExceptionsToPrint(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(PRINT_MAX_DECODER_EXCEPTIONS, 10);
    }
}
