package com.linkedin.camus.etl.kafka.mapred;

import com.google.common.base.Strings;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
import com.linkedin.camus.etl.kafka.common.EmailClient;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.LeaderInfo;
import com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider;
import com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider;
import com.linkedin.camus.workallocater.CamusRequest;
import com.linkedin.camus.workallocater.WorkAllocator;
import java.io.IOException;
import java.net.URI;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.class */
public class EtlInputFormat extends InputFormat<EtlKey, CamusWrapper> {
    public static final String KAFKA_BLACKLIST_TOPIC = "kafka.blacklist.topics";
    public static final String KAFKA_WHITELIST_TOPIC = "kafka.whitelist.topics";
    public static final String KAFKA_MOVE_TO_LAST_OFFSET_LIST = "kafka.move.to.last.offset.list";
    public static final String KAFKA_MOVE_TO_EARLIEST_OFFSET = "kafka.move.to.earliest.offset";
    public static final String KAFKA_CLIENT_BUFFER_SIZE = "kafka.client.buffer.size";
    public static final String KAFKA_CLIENT_SO_TIMEOUT = "kafka.client.so.timeout";
    public static final String KAFKA_MAX_PULL_HRS = "kafka.max.pull.hrs";
    public static final String KAFKA_MAX_PULL_MINUTES_PER_TASK = "kafka.max.pull.minutes.per.task";
    public static final String KAFKA_MAX_HISTORICAL_DAYS = "kafka.max.historical.days";
    public static final String CAMUS_MESSAGE_DECODER_CLASS = "camus.message.decoder.class";
    public static final String ETL_IGNORE_SCHEMA_ERRORS = "etl.ignore.schema.errors";
    public static final String ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST = "etl.audit.ignore.service.topic.list";
    public static final String CAMUS_WORK_ALLOCATOR_CLASS = "camus.work.allocator.class";
    public static final String CAMUS_WORK_ALLOCATOR_DEFAULT = "com.linkedin.camus.workallocater.BaseAllocator";
    private static final int BACKOFF_UNIT_MILLISECONDS = 1000;
    public static final int NUM_TRIES_PARTITION_METADATA = 3;
    public static final int NUM_TRIES_FETCH_FROM_LEADER = 3;
    public static final int NUM_TRIES_TOPIC_METADATA = 3;
    public static boolean reportJobFailureDueToOffsetOutOfRange = false;
    public static boolean reportJobFailureUnableToGetOffsetFromKafka = false;
    public static boolean reportJobFailureDueToLeaderNotAvailable = false;
    private static Logger log = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlInputFormat$OffsetFileFilter.class */
    public class OffsetFileFilter implements PathFilter {
        private OffsetFileFilter() {
        }

        public boolean accept(Path path) {
            return path.getName().startsWith(EtlMultiOutputFormat.OFFSET_PREFIX);
        }
    }

    public EtlInputFormat() {
        if (log == null) {
            log = Logger.getLogger(getClass());
        }
    }

    public static void setLogger(Logger logger) {
        log = logger;
    }

    public RecordReader<EtlKey, CamusWrapper> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new EtlRecordReader(this, inputSplit, taskAttemptContext);
    }

    public List<TopicMetadata> getKafkaMetadata(JobContext jobContext, List<String> list) {
        CamusJob.startTiming("kafkaSetupTime");
        String kafkaBrokers = CamusJob.getKafkaBrokers(jobContext);
        if (kafkaBrokers.isEmpty()) {
            throw new InvalidParameterException("kafka.brokers must contain at least one node");
        }
        List asList = Arrays.asList(kafkaBrokers.split("\\s*,\\s*"));
        Collections.shuffle(asList);
        boolean z = false;
        int i = 0;
        List<TopicMetadata> list2 = null;
        Exception exc = null;
        while (i < asList.size() && !z) {
            SimpleConsumer createBrokerConsumer = createBrokerConsumer(jobContext, (String) asList.get(i));
            log.info(String.format("Fetching metadata from broker %s with client id %s for %d topic(s) %s", asList.get(i), createBrokerConsumer.clientId(), Integer.valueOf(list.size()), list));
            for (int i2 = 0; i2 < 3; i2++) {
                try {
                    try {
                        list2 = createBrokerConsumer.send(new TopicMetadataRequest(list)).topicsMetadata();
                        z = true;
                        break;
                    } catch (Exception e) {
                        exc = e;
                        log.warn(String.format("Fetching topic metadata with client id %s for topics [%s] from broker [%s] failed, iter[%s]", createBrokerConsumer.clientId(), list, asList.get(i), Integer.valueOf(i2)), e);
                        try {
                            Thread.sleep((long) (Math.random() * (i2 + 1) * 1000.0d));
                        } catch (InterruptedException e2) {
                            log.warn("Caught InterruptedException: " + e2);
                        }
                    }
                } finally {
                    createBrokerConsumer.close();
                    int i3 = i + 1;
                }
            }
        }
        if (!z) {
            throw new RuntimeException("Failed to obtain metadata!", exc);
        }
        CamusJob.stopTiming("kafkaSetupTime");
        return list2;
    }

    private SimpleConsumer createBrokerConsumer(JobContext jobContext, String str) {
        if (!str.matches(".+:\\d+")) {
            throw new InvalidParameterException("The kakfa broker " + str + " must follow address:port pattern");
        }
        String[] split = str.split(":");
        return createSimpleConsumer(jobContext, split[0], Integer.valueOf(split[1]).intValue());
    }

    public SimpleConsumer createSimpleConsumer(JobContext jobContext, String str, int i) {
        return new SimpleConsumer(str, i, CamusJob.getKafkaTimeoutValue(jobContext), CamusJob.getKafkaBufferSize(jobContext), CamusJob.getKafkaClientName(jobContext));
    }

    public ArrayList<CamusRequest> fetchLatestOffsetAndCreateEtlRequests(JobContext jobContext, HashMap<LeaderInfo, ArrayList<TopicAndPartition>> hashMap) {
        ArrayList<CamusRequest> arrayList = new ArrayList<>();
        for (LeaderInfo leaderInfo : hashMap.keySet()) {
            SimpleConsumer createSimpleConsumer = createSimpleConsumer(jobContext, leaderInfo.getUri().getHost(), leaderInfo.getUri().getPort());
            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
            PartitionOffsetRequestInfo partitionOffsetRequestInfo2 = new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1);
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            ArrayList<TopicAndPartition> arrayList2 = hashMap.get(leaderInfo);
            Iterator<TopicAndPartition> it = arrayList2.iterator();
            while (it.hasNext()) {
                TopicAndPartition next = it.next();
                hashMap2.put(next, partitionOffsetRequestInfo);
                hashMap3.put(next, partitionOffsetRequestInfo2);
            }
            OffsetResponse latestOffsetResponse = getLatestOffsetResponse(createSimpleConsumer, hashMap2, jobContext);
            OffsetResponse latestOffsetResponse2 = latestOffsetResponse != null ? getLatestOffsetResponse(createSimpleConsumer, hashMap3, jobContext) : null;
            createSimpleConsumer.close();
            if (latestOffsetResponse2 == null) {
                log.warn(generateLogWarnForSkippedTopics(hashMap3, createSimpleConsumer));
                reportJobFailureUnableToGetOffsetFromKafka = true;
            } else {
                Iterator<TopicAndPartition> it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    TopicAndPartition next2 = it2.next();
                    long j = latestOffsetResponse.offsets(next2.topic(), next2.partition())[0];
                    long j2 = latestOffsetResponse2.offsets(next2.topic(), next2.partition())[0];
                    EtlRequest etlRequest = new EtlRequest(jobContext, next2.topic(), Integer.toString(leaderInfo.getLeaderId()), next2.partition(), leaderInfo.getUri());
                    etlRequest.setLatestOffset(j);
                    etlRequest.setEarliestOffset(j2);
                    arrayList.add(etlRequest);
                }
            }
        }
        return arrayList;
    }

    protected OffsetResponse getLatestOffsetResponse(SimpleConsumer simpleConsumer, Map<TopicAndPartition, PartitionOffsetRequestInfo> map, JobContext jobContext) {
        for (int i = 0; i < 3; i++) {
            try {
                OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(map, OffsetRequest.CurrentVersion(), CamusJob.getKafkaClientName(jobContext)));
                if (!offsetsBefore.hasError()) {
                    return offsetsBefore;
                }
                throw new RuntimeException("offsetReponse has error.");
                break;
            } catch (Exception e) {
                log.warn("Fetching offset from leader " + simpleConsumer.host() + ":" + simpleConsumer.port() + " has failed " + (i + 1) + " time(s). Reason: " + e.getMessage() + " " + ((3 - i) - 1) + " retries left.");
                if (i < 2) {
                    try {
                        Thread.sleep((long) (Math.random() * (i + 1) * 1000.0d));
                    } catch (InterruptedException e2) {
                        log.error("Caught interrupted exception between retries of getting latest offsets. " + e2.getMessage());
                    }
                }
            }
        }
        return null;
    }

    private String generateLogWarnForSkippedTopics(Map<TopicAndPartition, PartitionOffsetRequestInfo> map, SimpleConsumer simpleConsumer) {
        StringBuilder sb = new StringBuilder();
        sb.append("The following topics will be skipped due to failure in fetching latest offsets from leader " + simpleConsumer.host() + ":" + simpleConsumer.port());
        Iterator<TopicAndPartition> it = map.keySet().iterator();
        while (it.hasNext()) {
            sb.append("  " + it.next().topic());
        }
        return sb.toString();
    }

    public String createTopicRegEx(HashSet<String> hashSet) {
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = hashSet.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
            sb.append("|");
        }
        String str = "(" + sb.substring(0, sb.length() - 1) + ")";
        Pattern.compile(str);
        return str;
    }

    public List<TopicMetadata> filterWhitelistTopics(List<TopicMetadata> list, HashSet<String> hashSet) {
        ArrayList arrayList = new ArrayList();
        String createTopicRegEx = createTopicRegEx(hashSet);
        for (TopicMetadata topicMetadata : list) {
            if (Pattern.matches(createTopicRegEx, topicMetadata.topic())) {
                arrayList.add(topicMetadata);
            } else {
                log.info("Discarding topic : " + topicMetadata.topic());
            }
        }
        return arrayList;
    }

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        CamusJob.startTiming("getSplits");
        HashMap<LeaderInfo, ArrayList<TopicAndPartition>> hashMap = new HashMap<>();
        try {
            List<TopicMetadata> kafkaMetadata = getKafkaMetadata(jobContext, new ArrayList());
            HashSet<String> hashSet = new HashSet<>(Arrays.asList(getKafkaWhitelistTopic(jobContext)));
            if (!hashSet.isEmpty()) {
                kafkaMetadata = filterWhitelistTopics(kafkaMetadata, hashSet);
            }
            HashSet<String> hashSet2 = new HashSet<>(Arrays.asList(getKafkaBlacklistTopic(jobContext)));
            String str = SequenceFileRecordWriterProvider.DEFAULT_RECORD_DELIMITER;
            if (!hashSet2.isEmpty()) {
                str = createTopicRegEx(hashSet2);
            }
            for (TopicMetadata topicMetadata : kafkaMetadata) {
                if (Pattern.matches(str, topicMetadata.topic())) {
                    log.info("Discarding topic (blacklisted): " + topicMetadata.topic());
                } else if (!createMessageDecoder(jobContext, topicMetadata.topic())) {
                    log.info("Discarding topic (Decoder generation failed) : " + topicMetadata.topic());
                } else if (topicMetadata.errorCode() != ErrorMapping.NoError()) {
                    log.info("Skipping the creation of ETL request for Whole Topic : " + topicMetadata.topic() + " Exception : " + ErrorMapping.exceptionFor(topicMetadata.errorCode()));
                } else {
                    Iterator it = topicMetadata.partitionsMetadata().iterator();
                    while (it.hasNext()) {
                        PartitionMetadata refreshPartitionMetadataOnLeaderNotAvailable = refreshPartitionMetadataOnLeaderNotAvailable((PartitionMetadata) it.next(), topicMetadata, jobContext, 3);
                        if (refreshPartitionMetadataOnLeaderNotAvailable.errorCode() == ErrorMapping.LeaderNotAvailableCode()) {
                            log.info("Skipping the creation of ETL request for Topic : " + topicMetadata.topic() + " and Partition : " + refreshPartitionMetadataOnLeaderNotAvailable.partitionId() + " Exception : " + ErrorMapping.exceptionFor(refreshPartitionMetadataOnLeaderNotAvailable.errorCode()));
                            reportJobFailureDueToLeaderNotAvailable = true;
                        } else {
                            if (refreshPartitionMetadataOnLeaderNotAvailable.errorCode() != ErrorMapping.NoError()) {
                                log.warn("Receiving non-fatal error code, Continuing the creation of ETL request for Topic : " + topicMetadata.topic() + " and Partition : " + refreshPartitionMetadataOnLeaderNotAvailable.partitionId() + " Exception : " + ErrorMapping.exceptionFor(refreshPartitionMetadataOnLeaderNotAvailable.errorCode()));
                            }
                            LeaderInfo leaderInfo = new LeaderInfo(new URI("tcp://" + refreshPartitionMetadataOnLeaderNotAvailable.leader().getConnectionString()), refreshPartitionMetadataOnLeaderNotAvailable.leader().id());
                            if (hashMap.containsKey(leaderInfo)) {
                                ArrayList<TopicAndPartition> arrayList = hashMap.get(leaderInfo);
                                arrayList.add(new TopicAndPartition(topicMetadata.topic(), refreshPartitionMetadataOnLeaderNotAvailable.partitionId()));
                                hashMap.put(leaderInfo, arrayList);
                            } else {
                                ArrayList<TopicAndPartition> arrayList2 = new ArrayList<>();
                                arrayList2.add(new TopicAndPartition(topicMetadata.topic(), refreshPartitionMetadataOnLeaderNotAvailable.partitionId()));
                                hashMap.put(leaderInfo, arrayList2);
                            }
                        }
                    }
                }
            }
            ArrayList<CamusRequest> fetchLatestOffsetAndCreateEtlRequests = fetchLatestOffsetAndCreateEtlRequests(jobContext, hashMap);
            Collections.sort(fetchLatestOffsetAndCreateEtlRequests, new Comparator<CamusRequest>() { // from class: com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.1
                @Override // java.util.Comparator
                public int compare(CamusRequest camusRequest, CamusRequest camusRequest2) {
                    return camusRequest.getTopic().compareTo(camusRequest2.getTopic());
                }
            });
            writeRequests(fetchLatestOffsetAndCreateEtlRequests, jobContext);
            Map<CamusRequest, EtlKey> previousOffsets = getPreviousOffsets(FileInputFormat.getInputPaths(jobContext), jobContext);
            Set<String> moveToLatestTopicsSet = getMoveToLatestTopicsSet(jobContext);
            String str2 = SequenceFileRecordWriterProvider.DEFAULT_RECORD_DELIMITER;
            Iterator<CamusRequest> it2 = fetchLatestOffsetAndCreateEtlRequests.iterator();
            while (it2.hasNext()) {
                CamusRequest next = it2.next();
                if (moveToLatestTopicsSet.contains(next.getTopic()) || moveToLatestTopicsSet.contains("all")) {
                    log.info("Moving to latest for topic: " + next.getTopic());
                    EtlKey etlKey = previousOffsets.get(next);
                    EtlKey etlKey2 = new EtlKey(next.getTopic(), ((EtlRequest) next).getLeaderId(), next.getPartition(), 0L, next.getLastOffset());
                    if (etlKey != null) {
                        etlKey2.setMessageSize(etlKey.getMessageSize());
                    }
                    previousOffsets.put(next, etlKey2);
                }
                EtlKey etlKey3 = previousOffsets.get(next);
                if (etlKey3 != null) {
                    next.setOffset(etlKey3.getOffset());
                    next.setAvgMsgSize(etlKey3.getMessageSize());
                }
                if (next.getEarliestOffset() > next.getOffset() || next.getOffset() > next.getLastOffset()) {
                    if (next.getEarliestOffset() > next.getOffset()) {
                        log.error("The earliest offset was found to be more than the current offset: " + next);
                    } else {
                        log.error("The current offset was found to be more than the latest offset: " + next);
                    }
                    boolean z = jobContext.getConfiguration().getBoolean(KAFKA_MOVE_TO_EARLIEST_OFFSET, false);
                    boolean z2 = next.getOffset() == 0;
                    log.info("move_to_earliest: " + z + " offset_unset: " + z2);
                    if (z || z2) {
                        log.error("Moving to the earliest offset available");
                        next.setOffset(next.getEarliestOffset());
                        previousOffsets.put(next, new EtlKey(next.getTopic(), ((EtlRequest) next).getLeaderId(), next.getPartition(), 0L, next.getOffset()));
                    } else {
                        log.error("Offset range from kafka metadata is outside the previously persisted offset, " + next + StringRecordWriterProvider.DEFAULT_RECORD_DELIMITER + " Topic " + next.getTopic() + " will be skipped.\n Please check whether kafka cluster configuration is correct. You can also specify config parameter: " + KAFKA_MOVE_TO_EARLIEST_OFFSET + " to start processing from earliest kafka metadata offset.");
                        reportJobFailureDueToOffsetOutOfRange = true;
                    }
                } else if (3 * (next.getOffset() - next.getEarliestOffset()) < next.getLastOffset() - next.getOffset()) {
                    str2 = str2 + "The current offset is too close to the earliest offset, Camus might be falling behind: " + next + StringRecordWriterProvider.DEFAULT_RECORD_DELIMITER;
                }
                log.info(next);
            }
            if (!Strings.isNullOrEmpty(str2)) {
                EmailClient.sendEmail(str2);
            }
            writePrevious(previousOffsets.values(), jobContext);
            CamusJob.stopTiming("getSplits");
            CamusJob.startTiming("hadoop");
            CamusJob.setTime("hadoop_start");
            WorkAllocator workAllocator = getWorkAllocator(jobContext);
            Properties properties = new Properties();
            properties.putAll(jobContext.getConfiguration().getValByRegex(".*"));
            workAllocator.init(properties);
            return workAllocator.allocateWork(fetchLatestOffsetAndCreateEtlRequests, jobContext);
        } catch (Exception e) {
            log.error("Unable to pull requests from Kafka brokers. Exiting the program", e);
            throw new IOException("Unable to pull requests from Kafka brokers.", e);
        }
    }

    private Set<String> getMoveToLatestTopicsSet(JobContext jobContext) {
        HashSet hashSet = new HashSet();
        String[] moveToLatestTopics = getMoveToLatestTopics(jobContext);
        if (moveToLatestTopics != null) {
            for (String str : moveToLatestTopics) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    private boolean createMessageDecoder(JobContext jobContext, String str) {
        try {
            MessageDecoderFactory.createMessageDecoder(jobContext, str);
            return true;
        } catch (Exception e) {
            log.error("failed to create decoder", e);
            return false;
        }
    }

    private void writePrevious(Collection<EtlKey> collection, JobContext jobContext) throws IOException {
        FileSystem fileSystem = FileSystem.get(jobContext.getConfiguration());
        Path outputPath = FileOutputFormat.getOutputPath(jobContext);
        if (fileSystem.exists(outputPath)) {
            fileSystem.mkdirs(outputPath);
        }
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, jobContext.getConfiguration(), new Path(outputPath, "offsets-previous"), EtlKey.class, NullWritable.class);
        Iterator<EtlKey> it = collection.iterator();
        while (it.hasNext()) {
            createWriter.append(it.next(), NullWritable.get());
        }
        createWriter.close();
    }

    protected void writeRequests(List<CamusRequest> list, JobContext jobContext) throws IOException {
        FileSystem fileSystem = FileSystem.get(jobContext.getConfiguration());
        Path outputPath = FileOutputFormat.getOutputPath(jobContext);
        if (fileSystem.exists(outputPath)) {
            fileSystem.mkdirs(outputPath);
        }
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, jobContext.getConfiguration(), new Path(outputPath, EtlMultiOutputFormat.REQUESTS_FILE), EtlRequest.class, NullWritable.class);
        Iterator<CamusRequest> it = list.iterator();
        while (it.hasNext()) {
            createWriter.append(it.next(), NullWritable.get());
        }
        createWriter.close();
    }

    private Map<CamusRequest, EtlKey> getPreviousOffsets(Path[] pathArr, JobContext jobContext) throws IOException {
        HashMap hashMap = new HashMap();
        for (Path path : pathArr) {
            FileSystem fileSystem = path.getFileSystem(jobContext.getConfiguration());
            for (FileStatus fileStatus : fileSystem.listStatus(path, new OffsetFileFilter())) {
                log.info("previous offset file:" + fileStatus.getPath().toString());
                SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fileStatus.getPath(), jobContext.getConfiguration());
                EtlKey etlKey = new EtlKey();
                while (true) {
                    EtlKey etlKey2 = etlKey;
                    if (reader.next(etlKey2, NullWritable.get())) {
                        EtlRequest etlRequest = new EtlRequest(jobContext, etlKey2.getTopic(), etlKey2.getLeaderId(), etlKey2.getPartition());
                        if (!hashMap.containsKey(etlRequest)) {
                            hashMap.put(etlRequest, etlKey2);
                        } else if (((EtlKey) hashMap.get(etlRequest)).getOffset() < etlKey2.getOffset()) {
                            hashMap.put(etlRequest, etlKey2);
                        }
                        etlKey = new EtlKey();
                    }
                }
                reader.close();
            }
        }
        return hashMap;
    }

    /* JADX WARN: Code restructure failed: missing block: B:22:0x00cf, code lost:
    
        if (r12 != false) goto L36;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00d2, code lost:
    
        com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.log.error("No matching partition found in the topicMetadata for Partition: " + r6.partitionId());
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public kafka.javaapi.PartitionMetadata refreshPartitionMetadataOnLeaderNotAvailable(kafka.javaapi.PartitionMetadata r6, kafka.javaapi.TopicMetadata r7, org.apache.hadoop.mapreduce.JobContext r8, int r9) throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 247
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.refreshPartitionMetadataOnLeaderNotAvailable(kafka.javaapi.PartitionMetadata, kafka.javaapi.TopicMetadata, org.apache.hadoop.mapreduce.JobContext, int):kafka.javaapi.PartitionMetadata");
    }

    public static void setWorkAllocator(JobContext jobContext, Class<WorkAllocator> cls) {
        jobContext.getConfiguration().setClass(CAMUS_WORK_ALLOCATOR_CLASS, cls, WorkAllocator.class);
    }

    public static WorkAllocator getWorkAllocator(JobContext jobContext) {
        try {
            return (WorkAllocator) jobContext.getConfiguration().getClass(CAMUS_WORK_ALLOCATOR_CLASS, Class.forName(CAMUS_WORK_ALLOCATOR_DEFAULT)).newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void setMoveToLatestTopics(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_MOVE_TO_LAST_OFFSET_LIST, str);
    }

    public static String[] getMoveToLatestTopics(JobContext jobContext) {
        return jobContext.getConfiguration().getStrings(KAFKA_MOVE_TO_LAST_OFFSET_LIST);
    }

    public static void setKafkaClientBufferSize(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_CLIENT_BUFFER_SIZE, i);
    }

    public static int getKafkaClientBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_CLIENT_BUFFER_SIZE, 2097152);
    }

    public static void setKafkaClientTimeout(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_CLIENT_SO_TIMEOUT, i);
    }

    public static int getKafkaClientTimeout(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_CLIENT_SO_TIMEOUT, 60000);
    }

    public static void setKafkaMaxPullHrs(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_PULL_HRS, i);
    }

    public static int getKafkaMaxPullHrs(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_PULL_HRS, -1);
    }

    public static void setKafkaMaxPullMinutesPerTask(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_PULL_MINUTES_PER_TASK, i);
    }

    public static int getKafkaMaxPullMinutesPerTask(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_PULL_MINUTES_PER_TASK, -1);
    }

    public static void setKafkaMaxHistoricalDays(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_HISTORICAL_DAYS, i);
    }

    public static int getKafkaMaxHistoricalDays(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_HISTORICAL_DAYS, -1);
    }

    public static void setKafkaBlacklistTopic(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_BLACKLIST_TOPIC, str);
    }

    public static String[] getKafkaBlacklistTopic(JobContext jobContext) {
        return getKafkaBlacklistTopic(jobContext.getConfiguration());
    }

    public static String[] getKafkaBlacklistTopic(Configuration configuration) {
        String str = configuration.get(KAFKA_BLACKLIST_TOPIC);
        return (str == null || str.isEmpty()) ? new String[0] : configuration.getStrings(KAFKA_BLACKLIST_TOPIC);
    }

    public static void setKafkaWhitelistTopic(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_WHITELIST_TOPIC, str);
    }

    public static String[] getKafkaWhitelistTopic(JobContext jobContext) {
        return getKafkaWhitelistTopic(jobContext.getConfiguration());
    }

    public static String[] getKafkaWhitelistTopic(Configuration configuration) {
        String str = configuration.get(KAFKA_WHITELIST_TOPIC);
        return (str == null || str.isEmpty()) ? new String[0] : configuration.getStrings(KAFKA_WHITELIST_TOPIC);
    }

    public static void setEtlIgnoreSchemaErrors(JobContext jobContext, boolean z) {
        jobContext.getConfiguration().setBoolean(ETL_IGNORE_SCHEMA_ERRORS, z);
    }

    public static boolean getEtlIgnoreSchemaErrors(JobContext jobContext) {
        return jobContext.getConfiguration().getBoolean(ETL_IGNORE_SCHEMA_ERRORS, false);
    }

    public static void setEtlAuditIgnoreServiceTopicList(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST, str);
    }

    public static String[] getEtlAuditIgnoreServiceTopicList(JobContext jobContext) {
        return jobContext.getConfiguration().getStrings(ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST, new String[]{SequenceFileRecordWriterProvider.DEFAULT_RECORD_DELIMITER});
    }

    public static void setMessageDecoderClass(JobContext jobContext, Class<MessageDecoder> cls) {
        jobContext.getConfiguration().setClass(CAMUS_MESSAGE_DECODER_CLASS, cls, MessageDecoder.class);
    }

    public static Class<MessageDecoder> getMessageDecoderClass(JobContext jobContext) {
        return jobContext.getConfiguration().getClass(CAMUS_MESSAGE_DECODER_CLASS, KafkaAvroMessageDecoder.class);
    }

    public static Class<MessageDecoder> getMessageDecoderClass(JobContext jobContext, String str) {
        Class<MessageDecoder> cls = jobContext.getConfiguration().getClass("camus.message.decoder.class." + str, (Class) null);
        return cls == null ? getMessageDecoderClass(jobContext) : cls;
    }
}
