package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import kafka.api.PartitionFetchInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchRequest;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/common/KafkaReader.class */
public class KafkaReader {
    private static Logger log = Logger.getLogger(KafkaReader.class);
    private EtlRequest kafkaRequest;
    private SimpleConsumer simpleConsumer;
    private long beginOffset;
    private long currentOffset;
    private long lastOffset;
    private long currentCount;
    private TaskAttemptContext context;
    private long totalFetchTime;
    private int fetchBufferSize;
    private Iterator<MessageAndOffset> messageIter = null;
    private long lastFetchTime = 0;

    public KafkaReader(EtlInputFormat etlInputFormat, TaskAttemptContext taskAttemptContext, EtlRequest etlRequest, int i, int i2) throws Exception {
        this.kafkaRequest = null;
        this.simpleConsumer = null;
        this.totalFetchTime = 0L;
        this.fetchBufferSize = i2;
        this.context = taskAttemptContext;
        log.info("bufferSize=" + i2);
        log.info("timeout=" + i);
        this.kafkaRequest = etlRequest;
        this.beginOffset = etlRequest.getOffset();
        this.currentOffset = etlRequest.getOffset();
        this.lastOffset = etlRequest.getLastOffset();
        this.currentCount = 0L;
        this.totalFetchTime = 0L;
        URI uri = this.kafkaRequest.getURI();
        this.simpleConsumer = etlInputFormat.createSimpleConsumer(taskAttemptContext, uri.getHost(), uri.getPort());
        log.info("Connected to leader " + uri + " beginning reading at offset " + this.beginOffset + " latest offset=" + this.lastOffset);
        fetch();
    }

    public boolean hasNext() throws IOException {
        if (this.currentOffset >= this.lastOffset) {
            return false;
        }
        if (this.messageIter == null || !this.messageIter.hasNext()) {
            return fetch();
        }
        return true;
    }

    public KafkaMessage getNext(EtlKey etlKey) throws IOException {
        if (!hasNext()) {
            log.debug("KafkaReader has no more messages to read for " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + ". currentOffset: " + this.currentOffset + ", lastOffset: " + this.lastOffset);
            return null;
        }
        MessageAndOffset next = this.messageIter.next();
        Message message = next.message();
        byte[] bytes = getBytes(message.payload());
        byte[] bytes2 = getBytes(message.key());
        if (bytes == null) {
            log.warn("Received message with null message.payload(): " + next);
        }
        etlKey.clear();
        etlKey.set(this.kafkaRequest.getTopic(), this.kafkaRequest.getLeaderId(), this.kafkaRequest.getPartition(), this.currentOffset, next.offset() + 1, message.checksum());
        etlKey.setMessageSize(next.message().size());
        this.currentOffset = next.offset() + 1;
        this.currentCount++;
        return new KafkaMessage(bytes, bytes2, this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition(), next.offset(), message.checksum());
    }

    private byte[] getBytes(ByteBuffer byteBuffer) {
        byte[] bArr = null;
        if (byteBuffer != null) {
            int remaining = byteBuffer.remaining();
            bArr = new byte[remaining];
            byteBuffer.get(bArr, byteBuffer.position(), remaining);
        }
        return bArr;
    }

    public boolean fetch() throws IOException {
        if (this.currentOffset >= this.lastOffset) {
            log.debug("KafkaReader.fetch() for " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + " currentOffset: " + this.currentOffset + " >= lastOffset: " + this.lastOffset + ". No more fetches");
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition());
        PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(this.currentOffset, this.fetchBufferSize);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, partitionFetchInfo);
        FetchRequest fetchRequest = new FetchRequest(CamusJob.getKafkaFetchRequestCorrelationId(this.context), CamusJob.getKafkaClientName(this.context), CamusJob.getKafkaFetchRequestMaxWait(this.context), CamusJob.getKafkaFetchRequestMinBytes(this.context), hashMap);
        try {
            FetchResponse fetch = this.simpleConsumer.fetch(fetchRequest);
            if (fetch.hasError()) {
                throw new RuntimeException("Error Code generated : " + ((int) fetch.errorCode(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition())) + StringRecordWriterProvider.DEFAULT_RECORD_DELIMITER);
            }
            return processFetchResponse(fetch, currentTimeMillis);
        } catch (Exception e) {
            log.info("Exception generated during fetch for topic " + this.kafkaRequest.getTopic() + ": " + e.getMessage() + ". Will refresh topic metadata and retry.");
            return refreshTopicMetadataAndRetryFetch(fetchRequest, currentTimeMillis);
        }
    }

    private boolean refreshTopicMetadataAndRetryFetch(FetchRequest fetchRequest, long j) {
        try {
            refreshTopicMetadata();
            FetchResponse fetch = this.simpleConsumer.fetch(fetchRequest);
            if (!fetch.hasError()) {
                return processFetchResponse(fetch, j);
            }
            log.warn("Error encountered during fetch request retry from Kafka");
            log.warn("Error Code generated : " + ((int) fetch.errorCode(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition())));
            return false;
        } catch (Exception e) {
            log.info("Exception generated during fetch for topic " + this.kafkaRequest.getTopic() + ". This topic will be skipped.");
            return false;
        }
    }

    private void refreshTopicMetadata() {
        TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(this.kafkaRequest.getTopic()));
        try {
            for (PartitionMetadata partitionMetadata : ((TopicMetadata) this.simpleConsumer.send(topicMetadataRequest).topicsMetadata().get(0)).partitionsMetadata()) {
                if (partitionMetadata.partitionId() == this.kafkaRequest.getPartition()) {
                    this.simpleConsumer = new SimpleConsumer(partitionMetadata.leader().host(), partitionMetadata.leader().port(), CamusJob.getKafkaTimeoutValue(this.context), CamusJob.getKafkaBufferSize(this.context), CamusJob.getKafkaClientName(this.context));
                    return;
                }
            }
        } catch (Exception e) {
            log.error("Exception caught when refreshing metadata for topic " + ((String) topicMetadataRequest.topics().get(0)) + ": " + e.getMessage());
        }
    }

    private boolean processFetchResponse(FetchResponse fetchResponse, long j) {
        try {
            ByteBufferMessageSet messageSet = fetchResponse.messageSet(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition());
            this.lastFetchTime = System.currentTimeMillis() - j;
            log.debug("Time taken to fetch " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + " starting at offset " + this.kafkaRequest.getOffset() + ": " + (this.lastFetchTime / 1000) + " seconds");
            log.debug("The size of the ByteBufferMessageSet returned is : " + messageSet.sizeInBytes());
            int i = 0;
            this.totalFetchTime += this.lastFetchTime;
            this.messageIter = messageSet.iterator();
            Iterator it = messageSet.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                if (messageAndOffset.offset() >= this.currentOffset) {
                    log.debug("Skipped offsets for " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + " until offset : " + messageAndOffset.offset());
                    break;
                }
                log.warn("Kafka fetch response for " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + " at message offset " + messageAndOffset.offset() + " is greater than current offset " + this.currentOffset + ", skipping message.");
                i++;
            }
            log.debug("Number of offsets to be skipped for " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + ": " + i);
            while (i != 0) {
                log.debug("Skipping " + this.kafkaRequest.getTopic() + ":" + this.kafkaRequest.getPartition() + " offset : " + this.messageIter.next().offset());
                i--;
            }
            if (this.messageIter.hasNext()) {
                return true;
            }
            System.out.println("No more data left to process. Returning false");
            this.messageIter = null;
            return false;
        } catch (Exception e) {
            log.info("Exception generated during processing fetchResponse");
            return false;
        }
    }

    public void close() throws IOException {
        if (this.simpleConsumer != null) {
            this.simpleConsumer.close();
        }
    }

    public long getTotalBytes() {
        if (this.lastOffset > this.beginOffset) {
            return this.lastOffset - this.beginOffset;
        }
        return 0L;
    }

    public long getReadBytes() {
        return this.currentOffset - this.beginOffset;
    }

    public long getCount() {
        return this.currentCount;
    }

    public long getFetchTime() {
        return this.lastFetchTime;
    }

    public long getTotalFetchTime() {
        return this.totalFetchTime;
    }
}
