package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.etl.kafka.CamusJob;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import kafka.api.PartitionFetchInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchRequest;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/common/KafkaReader.class */
public class KafkaReader {
    private static Logger log = Logger.getLogger(KafkaReader.class);
    private EtlRequest kafkaRequest;
    private SimpleConsumer simpleConsumer;
    private long beginOffset;
    private long currentOffset;
    private long lastOffset;
    private long currentCount;
    private TaskAttemptContext context;
    private long totalFetchTime;
    private int fetchBufferSize;
    private Iterator<MessageAndOffset> messageIter = null;
    private long lastFetchTime = 0;

    public KafkaReader(TaskAttemptContext taskAttemptContext, EtlRequest etlRequest, int i, int i2) throws Exception {
        this.kafkaRequest = null;
        this.simpleConsumer = null;
        this.totalFetchTime = 0L;
        this.fetchBufferSize = i2;
        this.context = taskAttemptContext;
        log.info("bufferSize=" + i2);
        log.info("timeout=" + i);
        this.kafkaRequest = etlRequest;
        this.beginOffset = etlRequest.getOffset();
        this.currentOffset = etlRequest.getOffset();
        this.lastOffset = etlRequest.getLastOffset();
        this.currentCount = 0L;
        this.totalFetchTime = 0L;
        URI uri = this.kafkaRequest.getURI();
        this.simpleConsumer = new SimpleConsumer(uri.getHost(), uri.getPort(), CamusJob.getKafkaTimeoutValue(taskAttemptContext), CamusJob.getKafkaBufferSize(taskAttemptContext), CamusJob.getKafkaClientName(taskAttemptContext));
        log.info("Connected to leader " + uri + " beginning reading at offset " + this.beginOffset + " latest offset=" + this.lastOffset);
        fetch();
    }

    public boolean hasNext() throws IOException {
        if (this.messageIter == null || !this.messageIter.hasNext()) {
            return fetch();
        }
        return true;
    }

    public boolean getNext(EtlKey etlKey, BytesWritable bytesWritable, BytesWritable bytesWritable2) throws IOException {
        if (!hasNext()) {
            return false;
        }
        MessageAndOffset next = this.messageIter.next();
        Message message = next.message();
        ByteBuffer payload = message.payload();
        int remaining = payload.remaining();
        byte[] bArr = new byte[remaining];
        payload.get(bArr, payload.position(), remaining);
        bytesWritable.set(bArr, 0, remaining);
        ByteBuffer key = message.key();
        if (key != null) {
            int remaining2 = key.remaining();
            byte[] bArr2 = new byte[remaining2];
            key.get(bArr2, key.position(), remaining2);
            bytesWritable2.set(bArr2, 0, remaining2);
        }
        etlKey.clear();
        etlKey.set(this.kafkaRequest.getTopic(), this.kafkaRequest.getLeaderId(), this.kafkaRequest.getPartition(), this.currentOffset, next.offset() + 1, message.checksum());
        this.currentOffset = next.offset() + 1;
        this.currentCount++;
        return true;
    }

    public boolean fetch() throws IOException {
        if (this.currentOffset >= this.lastOffset) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition());
        log.debug("\nAsking for offset : " + this.currentOffset);
        PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(this.currentOffset, this.fetchBufferSize);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, partitionFetchInfo);
        try {
            FetchResponse fetch = this.simpleConsumer.fetch(new FetchRequest(CamusJob.getKafkaFetchRequestCorrelationId(this.context), CamusJob.getKafkaClientName(this.context), CamusJob.getKafkaFetchRequestMaxWait(this.context), CamusJob.getKafkaFetchRequestMinBytes(this.context), hashMap));
            if (fetch.hasError()) {
                log.info("Error encountered during a fetch request from Kafka");
                log.info("Error Code generated : " + ((int) fetch.errorCode(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition())));
                return false;
            }
            ByteBufferMessageSet messageSet = fetch.messageSet(this.kafkaRequest.getTopic(), this.kafkaRequest.getPartition());
            this.lastFetchTime = System.currentTimeMillis() - currentTimeMillis;
            log.debug("Time taken to fetch : " + (this.lastFetchTime / 1000) + " seconds");
            log.debug("The size of the ByteBufferMessageSet returned is : " + messageSet.sizeInBytes());
            int i = 0;
            this.totalFetchTime += this.lastFetchTime;
            this.messageIter = messageSet.iterator();
            Iterator it = messageSet.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                if (messageAndOffset.offset() >= this.currentOffset) {
                    log.debug("Skipped offsets till : " + messageAndOffset.offset());
                    break;
                }
                i++;
            }
            log.debug("Number of offsets to be skipped: " + i);
            while (i != 0) {
                log.debug("Skipping offset : " + this.messageIter.next().offset());
                i--;
            }
            if (this.messageIter.hasNext()) {
                return true;
            }
            System.out.println("No more data left to process. Returning false");
            this.messageIter = null;
            return false;
        } catch (Exception e) {
            log.info("Exception generated during fetch");
            e.printStackTrace();
            return false;
        }
    }

    public void close() throws IOException {
        if (this.simpleConsumer != null) {
            this.simpleConsumer.close();
        }
    }

    public long getTotalBytes() {
        if (this.lastOffset > this.beginOffset) {
            return this.lastOffset - this.beginOffset;
        }
        return 0L;
    }

    public long getReadBytes() {
        return this.currentOffset - this.beginOffset;
    }

    public long getCount() {
        return this.currentCount;
    }

    public long getFetchTime() {
        return this.lastFetchTime;
    }

    public long getTotalFetchTime() {
        return this.totalFetchTime;
    }
}
