package com.linkedin.camus.etl.kafka;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EmailClient;
import com.linkedin.camus.etl.kafka.common.EtlCounts;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.Source;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlMapper;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlRecordReader;
import com.linkedin.camus.etl.kafka.reporter.BaseReporter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJob.class */
public class CamusJob extends Configured implements Tool {
    public static final String ETL_EXECUTION_BASE_PATH = "etl.execution.base.path";
    public static final String ETL_EXECUTION_HISTORY_PATH = "etl.execution.history.path";
    public static final String ETL_COUNTS_PATH = "etl.counts.path";
    public static final String ETL_COUNTS_CLASS = "etl.counts.class";
    public static final String ETL_COUNTS_CLASS_DEFAULT = "com.linkedin.camus.etl.kafka.common.EtlCounts";
    public static final String ETL_KEEP_COUNT_FILES = "etl.keep.count.files";
    public static final String ETL_BASEDIR_QUOTA_OVERIDE = "etl.basedir.quota.overide";
    public static final String ETL_EXECUTION_HISTORY_MAX_OF_QUOTA = "etl.execution.history.max.of.quota";
    public static final String ETL_FAIL_ON_ERRORS = "etl.fail.on.errors";
    public static final String ETL_FAIL_ON_OFFSET_OUTOFRANGE = "etl.fail.on.offset.outofrange";
    public static final String ETL_MAX_PERCENT_SKIPPED_SCHEMANOTFOUND = "etl.max.percent.skipped.schemanotfound";
    public static final String ETL_MAX_PERCENT_SKIPPED_SCHEMANOTFOUND_DEFAULT = "0.1";
    public static final String ETL_MAX_PERCENT_SKIPPED_OTHER = "etl.max.percent.skipped.other";
    public static final String ETL_MAX_PERCENT_SKIPPED_OTHER_DEFAULT = "0.1";
    public static final String ETL_MAX_ERRORS_TO_PRINT_FROM_FILE = "etl.max.errors.to.print.from.file";
    public static final String ETL_MAX_ERRORS_TO_PRINT_FROM_FILE_DEFAULT = "10";
    public static final String ZK_AUDIT_HOSTS = "zookeeper.audit.hosts";
    public static final String KAFKA_MONITOR_TIER = "kafka.monitor.tier";
    public static final String CAMUS_MESSAGE_ENCODER_CLASS = "camus.message.encoder.class";
    public static final String BROKER_URI_FILE = "brokers.uri";
    public static final String POST_TRACKING_COUNTS_TO_KAFKA = "post.tracking.counts.to.kafka";
    public static final String KAFKA_FETCH_REQUEST_MAX_WAIT = "kafka.fetch.request.max.wait";
    public static final String KAFKA_FETCH_REQUEST_MIN_BYTES = "kafka.fetch.request.min.bytes";
    public static final String KAFKA_FETCH_REQUEST_CORRELATION_ID = "kafka.fetch.request.correlationid";
    public static final String KAFKA_CLIENT_NAME = "kafka.client.name";
    public static final String KAFKA_FETCH_BUFFER_SIZE = "kafka.fetch.buffer.size";
    public static final String KAFKA_BROKERS = "kafka.brokers";
    public static final String KAFKA_HOST_URL = "kafka.host.url";
    public static final String KAFKA_HOST_PORT = "kafka.host.port";
    public static final String KAFKA_TIMEOUT_VALUE = "kafka.timeout.value";
    public static final String CAMUS_REPORTER_CLASS = "etl.reporter.class";
    public static final String LOG4J_CONFIGURATION = "log4j.configuration";
    private static Logger log;
    private Job hadoopJob;
    private final Properties props;
    private DateTimeFormatter dateFmt;
    public static final String ETL_FAIL_ON_OFFSET_OUTOFRANGE_DEFAULT = Boolean.TRUE.toString();
    private static HashMap<String, Long> timingMap = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJob$PrefixFilter.class */
    public class PrefixFilter implements PathFilter {
        private final String prefix;

        public PrefixFilter(String str) {
            this.prefix = str;
        }

        public boolean accept(Path path) {
            return path.getName().startsWith(this.prefix);
        }
    }

    public CamusJob() throws IOException {
        this(new Properties());
    }

    public CamusJob(Properties properties) throws IOException {
        this(properties, Logger.getLogger(CamusJob.class));
    }

    public CamusJob(Properties properties, Logger logger) throws IOException {
        this.hadoopJob = null;
        this.dateFmt = DateUtils.getDateTimeFormatter("YYYY-MM-dd-HH-mm-ss", DateTimeZone.UTC);
        this.props = properties;
        log = logger;
    }

    public static void startTiming(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) - System.currentTimeMillis()));
    }

    public static void stopTiming(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) + System.currentTimeMillis()));
    }

    public static void setTime(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) + System.currentTimeMillis()));
    }

    private Job createJob(Properties properties) throws IOException {
        if (getConf() == null) {
            setConf(new Configuration());
        }
        populateConf(properties, getConf(), log);
        Job job = new Job(getConf());
        job.setJarByClass(CamusJob.class);
        if (job.getConfiguration().get("camus.job.name") != null) {
            job.setJobName(job.getConfiguration().get("camus.job.name"));
        } else {
            job.setJobName("Camus Job");
        }
        if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
            job.getConfiguration().set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
        }
        this.hadoopJob = job;
        return job;
    }

    public static void populateConf(Properties properties, Configuration configuration, Logger logger) throws IOException {
        for (Object obj : properties.keySet()) {
            configuration.set(obj.toString(), properties.getProperty(obj.toString()));
        }
        FileSystem fileSystem = FileSystem.get(configuration);
        String str = configuration.get("hdfs.default.classpath.dir", (String) null);
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(configuration.getStrings("cache.jar.filter.list", new String[0])).iterator();
        while (it.hasNext()) {
            arrayList.add(Pattern.compile((String) it.next()));
        }
        if (str != null) {
            FileStatus[] listStatus = fileSystem.listStatus(new Path(str));
            if (listStatus != null) {
                for (int i = 0; i < listStatus.length; i++) {
                    if (!listStatus[i].isDir()) {
                        logger.info("Adding Jar to Distributed Cache Archive File:" + listStatus[i].getPath());
                        boolean z = false;
                        Iterator it2 = arrayList.iterator();
                        while (true) {
                            if (it2.hasNext()) {
                                if (((Pattern) it2.next()).matcher(listStatus[i].getPath().getName()).matches()) {
                                    z = true;
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                        if (!z) {
                            DistributedCache.addFileToClassPath(listStatus[i].getPath(), configuration, fileSystem);
                        }
                    }
                }
            } else {
                System.out.println("hdfs.default.classpath.dir " + str + " is empty.");
            }
        }
        String str2 = configuration.get("hadoop.external.jarFiles", (String) null);
        if (str2 != null) {
            for (String str3 : str2.split(",")) {
                logger.info("Adding external jar File:" + str3);
                boolean z2 = false;
                Iterator it3 = arrayList.iterator();
                while (true) {
                    if (it3.hasNext()) {
                        if (((Pattern) it3.next()).matcher(new Path(str3).getName()).matches()) {
                            z2 = true;
                            break;
                        }
                    } else {
                        break;
                    }
                }
                if (!z2) {
                    DistributedCache.addFileToClassPath(new Path(str3), configuration, fileSystem);
                }
            }
        }
    }

    public void run() throws Exception {
        run(EtlInputFormat.class, EtlMultiOutputFormat.class);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void run(Class<? extends InputFormat> cls, Class<? extends OutputFormat> cls2) throws Exception {
        startTiming("pre-setup");
        startTiming("total");
        EmailClient.setup(this.props);
        Job createJob = createJob(this.props);
        if (getLog4jConfigure(createJob)) {
            DOMConfigurator.configure("log4j.xml");
        }
        FileSystem fileSystem = FileSystem.get(createJob.getConfiguration());
        log.info("Dir Destination set to: " + EtlMultiOutputFormat.getDestinationPath(createJob));
        Path path = new Path(this.props.getProperty(ETL_EXECUTION_BASE_PATH));
        Path path2 = new Path(this.props.getProperty(ETL_EXECUTION_HISTORY_PATH));
        if (!fileSystem.exists(path)) {
            log.info("The execution base path does not exist. Creating the directory");
            fileSystem.mkdirs(path);
        }
        if (!fileSystem.exists(path2)) {
            log.info("The history base path does not exist. Creating the directory.");
            fileSystem.mkdirs(path2);
        }
        ContentSummary contentSummary = fileSystem.getContentSummary(path);
        long quota = ((float) contentSummary.getQuota()) * createJob.getConfiguration().getFloat(ETL_EXECUTION_HISTORY_MAX_OF_QUOTA, 0.5f);
        long j = quota == 0 ? 50000L : quota;
        if (this.props.containsKey(ETL_BASEDIR_QUOTA_OVERIDE)) {
            j = Long.valueOf(this.props.getProperty(ETL_BASEDIR_QUOTA_OVERIDE)).longValue();
        }
        long fileCount = contentSummary.getFileCount() + contentSummary.getDirectoryCount();
        FileStatus[] listStatus = fileSystem.listStatus(path2);
        Arrays.sort(listStatus, new Comparator<FileStatus>() { // from class: com.linkedin.camus.etl.kafka.CamusJob.1
            @Override // java.util.Comparator
            public int compare(FileStatus fileStatus, FileStatus fileStatus2) {
                return fileStatus.getPath().getName().compareTo(fileStatus2.getPath().getName());
            }
        });
        for (int i = 0; i < listStatus.length - 1 && j < fileCount; i++) {
            FileStatus fileStatus = listStatus[i];
            log.info("removing old execution: " + fileStatus.getPath().getName());
            ContentSummary contentSummary2 = fileSystem.getContentSummary(fileStatus.getPath());
            fileCount -= contentSummary2.getFileCount() + contentSummary2.getDirectoryCount();
            fileSystem.delete(fileStatus.getPath(), true);
        }
        if (j < fileCount) {
            FileStatus[] listStatus2 = fileSystem.listStatus(path, new PathFilter() { // from class: com.linkedin.camus.etl.kafka.CamusJob.2
                public boolean accept(Path path3) {
                    try {
                        CamusJob.this.dateFmt.parseDateTime(path3.getName());
                        return true;
                    } catch (IllegalArgumentException e) {
                        return false;
                    }
                }
            });
            Arrays.sort(listStatus2, new Comparator<FileStatus>() { // from class: com.linkedin.camus.etl.kafka.CamusJob.3
                @Override // java.util.Comparator
                public int compare(FileStatus fileStatus2, FileStatus fileStatus3) {
                    return fileStatus2.getPath().getName().compareTo(fileStatus3.getPath().getName());
                }
            });
            for (int i2 = 0; i2 < listStatus2.length && j < fileCount; i2++) {
                FileStatus fileStatus2 = listStatus2[i2];
                log.info("removing failed execution: " + fileStatus2.getPath().getName());
                ContentSummary contentSummary3 = fileSystem.getContentSummary(fileStatus2.getPath());
                fileCount -= contentSummary3.getFileCount() + contentSummary3.getDirectoryCount();
                fileSystem.delete(fileStatus2.getPath(), true);
            }
        }
        if (listStatus.length > 0) {
            Path path3 = listStatus[listStatus.length - 1].getPath();
            FileInputFormat.setInputPaths(createJob, new Path[]{path3});
            log.info("Previous execution: " + path3.toString());
        } else {
            System.out.println("No previous execution, all topics pulled from earliest available offset");
        }
        String dateTime = new DateTime().toString(DateUtils.getDateTimeFormatter("YYYY-MM-dd-HH-mm-ss", DateTimeZone.UTC));
        Path path4 = new Path(path, dateTime);
        FileOutputFormat.setOutputPath(createJob, path4);
        log.info("New execution temp location: " + path4.toString());
        EtlInputFormat.setLogger(log);
        createJob.setMapperClass(EtlMapper.class);
        createJob.setInputFormatClass(cls);
        createJob.setOutputFormatClass(cls2);
        createJob.setNumReduceTasks(0);
        stopTiming("pre-setup");
        createJob.submit();
        createJob.waitForCompletion(true);
        Counters counters = createJob.getCounters();
        Iterator it = counters.getGroupNames().iterator();
        while (it.hasNext()) {
            CounterGroup<Counter> group = counters.getGroup((String) it.next());
            log.info("Group: " + group.getDisplayName());
            for (Counter counter : group) {
                log.info(counter.getDisplayName() + ":\t" + counter.getValue());
            }
        }
        stopTiming("hadoop");
        startTiming("commit");
        boolean checkExecutionErrors = checkExecutionErrors(fileSystem, path4);
        checkIfTooManySkippedMsg(counters);
        sendTrackingCounts(createJob, fileSystem, path4, Class.forName(this.props.getProperty(ETL_COUNTS_CLASS, ETL_COUNTS_CLASS_DEFAULT)));
        Path path5 = new Path(path2, dateTime);
        log.info("Moving execution to history : " + path5);
        fileSystem.rename(path4, path5);
        log.info("Job finished");
        stopTiming("commit");
        stopTiming("total");
        createReport(createJob, timingMap);
        if (createJob.isSuccessful()) {
            if (checkExecutionErrors && this.props.getProperty(ETL_FAIL_ON_ERRORS, Boolean.FALSE.toString()).equalsIgnoreCase(Boolean.TRUE.toString())) {
                throw new RuntimeException("Camus saw errors, check stderr");
            }
            if (EtlInputFormat.reportJobFailureDueToOffsetOutOfRange) {
                EtlInputFormat.reportJobFailureDueToOffsetOutOfRange = false;
                if (this.props.getProperty(ETL_FAIL_ON_OFFSET_OUTOFRANGE, ETL_FAIL_ON_OFFSET_OUTOFRANGE_DEFAULT).equalsIgnoreCase(Boolean.TRUE.toString())) {
                    throw new RuntimeException("Some topics skipped due to offsets from Kafka metadata out of range.");
                }
            }
            if (EtlInputFormat.reportJobFailureUnableToGetOffsetFromKafka) {
                EtlInputFormat.reportJobFailureUnableToGetOffsetFromKafka = false;
                throw new RuntimeException("Some topics skipped due to failure in getting latest offset from Kafka leaders.");
            }
            if (EtlInputFormat.reportJobFailureDueToLeaderNotAvailable) {
                EtlInputFormat.reportJobFailureDueToLeaderNotAvailable = false;
                throw new RuntimeException("Some topic partitions skipped due to Kafka leader not available.");
            }
            return;
        }
        JobClient jobClient = new JobClient(new JobConf(createJob.getConfiguration()));
        TaskCompletionEvent[] taskCompletionEvents = createJob.getTaskCompletionEvents(0);
        if (taskCompletionEvents.length > 0) {
            for (TaskReport taskReport : jobClient.getMapTaskReports(taskCompletionEvents[0].getTaskAttemptId().getJobID())) {
                if (taskReport.getCurrentStatus().equals(TIPStatus.FAILED)) {
                    for (String str : taskReport.getDiagnostics()) {
                        System.err.println("task error: " + str);
                    }
                }
            }
        }
        throw new RuntimeException("hadoop job failed");
    }

    private void checkIfTooManySkippedMsg(Counters counters) {
        double parseDouble = Double.parseDouble(this.props.getProperty(ETL_MAX_PERCENT_SKIPPED_SCHEMANOTFOUND, "0.1"));
        double parseDouble2 = Double.parseDouble(this.props.getProperty(ETL_MAX_PERCENT_SKIPPED_OTHER, "0.1"));
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (String str : counters.getGroupNames()) {
            if (str.equals(EtlRecordReader.KAFKA_MSG.class.getName())) {
                for (Counter counter : counters.getGroup(str)) {
                    if (counter.getDisplayName().equals(EtlRecordReader.KAFKA_MSG.DECODE_SUCCESSFUL.toString())) {
                        j3 = counter.getValue();
                    } else if (counter.getDisplayName().equals(EtlRecordReader.KAFKA_MSG.SKIPPED_SCHEMA_NOT_FOUND.toString())) {
                        j = counter.getValue();
                    } else if (counter.getDisplayName().equals(EtlRecordReader.KAFKA_MSG.SKIPPED_OTHER.toString())) {
                        j2 = counter.getValue();
                    }
                }
            }
        }
        checkIfTooManySkippedMsg(parseDouble, j, j3, "schema not found");
        checkIfTooManySkippedMsg(parseDouble2, j2, j3, "other");
    }

    private void checkIfTooManySkippedMsg(double d, long j, long j2, String str) {
        if (j == 0 && j2 == 0) {
            return;
        }
        double d2 = (j / (j + j2)) * 100.0d;
        if (d2 > d) {
            String str2 = "job failed: " + d2 + "% messages skipped due to " + str + ", maximum allowed is " + d + "%";
            log.error(str2);
            throw new RuntimeException(str2);
        }
    }

    public void cancel() throws IOException {
        if (this.hadoopJob != null) {
            this.hadoopJob.killJob();
        }
    }

    private Map<String, List<Pair<EtlKey, ExceptionWritable>>> readErrors(FileSystem fileSystem, Path path) throws IOException {
        int parseInt = Integer.parseInt(this.props.getProperty(ETL_MAX_ERRORS_TO_PRINT_FROM_FILE, ETL_MAX_ERRORS_TO_PRINT_FROM_FILE_DEFAULT));
        HashMap newHashMap = Maps.newHashMap();
        for (FileStatus fileStatus : fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.ERRORS_PREFIX))) {
            int i = 0;
            Path path2 = fileStatus.getPath();
            SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path2, fileSystem.getConf());
            EtlKey etlKey = new EtlKey();
            ExceptionWritable exceptionWritable = new ExceptionWritable();
            ArrayList newArrayList = Lists.newArrayList();
            while (reader.next(etlKey, exceptionWritable)) {
                i++;
                if (i <= parseInt) {
                    newArrayList.add(new Pair(new EtlKey(etlKey), new ExceptionWritable(exceptionWritable.toString())));
                }
            }
            if (i > 0) {
                if (i > parseInt) {
                    newArrayList.add(new Pair(new EtlKey(etlKey), new ExceptionWritable("... Too many errors to show. Skipped " + (i - parseInt) + " ...")));
                }
                newHashMap.put(path2.toString(), newArrayList);
            }
            reader.close();
        }
        return newHashMap;
    }

    private boolean checkExecutionErrors(FileSystem fileSystem, Path path) throws IOException {
        Map<String, List<Pair<EtlKey, ExceptionWritable>>> readErrors = readErrors(fileSystem, path);
        if (!readErrors.isEmpty()) {
            log.error("Errors encountered during job run:");
        }
        for (Map.Entry<String, List<Pair<EtlKey, ExceptionWritable>>> entry : readErrors.entrySet()) {
            String key = entry.getKey();
            List<Pair<EtlKey, ExceptionWritable>> value = entry.getValue();
            if (value.size() > 0) {
                log.error("Errors from file [" + key + "]");
            }
            for (Pair<EtlKey, ExceptionWritable> pair : value) {
                log.error("Error for EtlKey [" + ((EtlKey) pair.getKey()) + "]: " + ((ExceptionWritable) pair.getValue()).toString());
            }
        }
        return !readErrors.isEmpty();
    }

    public void sendTrackingCounts(JobContext jobContext, FileSystem fileSystem, Path path, Class<? extends EtlCounts> cls) throws IOException, URISyntaxException, IllegalArgumentException, SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
        if (EtlMultiOutputFormat.isRunTrackingPost(jobContext)) {
            FileStatus[] listStatus = fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.COUNTS_PREFIX));
            HashMap hashMap = new HashMap();
            for (FileStatus fileStatus : listStatus) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())), 1048576);
                StringBuffer stringBuffer = new StringBuffer();
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        stringBuffer.append(readLine);
                    }
                }
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                Iterator it = ((ArrayList) objectMapper.readValue(stringBuffer.toString(), new TypeReference<ArrayList<EtlCounts>>() { // from class: com.linkedin.camus.etl.kafka.CamusJob.4
                })).iterator();
                while (it.hasNext()) {
                    EtlCounts etlCounts = (EtlCounts) it.next();
                    String topic = etlCounts.getTopic();
                    if (hashMap.containsKey(topic)) {
                        EtlCounts etlCounts2 = (EtlCounts) hashMap.get(topic);
                        etlCounts2.setEndTime(Math.max(etlCounts2.getEndTime(), etlCounts.getEndTime()));
                        etlCounts2.setLastTimestamp(Math.max(etlCounts2.getLastTimestamp(), etlCounts.getLastTimestamp()));
                        etlCounts2.setStartTime(Math.min(etlCounts2.getStartTime(), etlCounts.getStartTime()));
                        etlCounts2.setFirstTimestamp(Math.min(etlCounts2.getFirstTimestamp(), etlCounts.getFirstTimestamp()));
                        etlCounts2.setErrorCount(etlCounts2.getErrorCount() + etlCounts.getErrorCount());
                        etlCounts2.setGranularity(etlCounts.getGranularity());
                        etlCounts2.setTopic(etlCounts.getTopic());
                        Iterator<Map.Entry<String, Source>> it2 = etlCounts.getCounts().entrySet().iterator();
                        while (it2.hasNext()) {
                            Source value = it2.next().getValue();
                            if (etlCounts2.getCounts().containsKey(value.toString())) {
                                Source source = etlCounts2.getCounts().get(value.toString());
                                source.setCount(source.getCount() + value.getCount());
                                etlCounts2.getCounts().put(source.toString(), source);
                            } else {
                                etlCounts2.getCounts().put(value.toString(), value);
                            }
                            hashMap.put(topic, etlCounts2);
                        }
                    } else {
                        hashMap.put(topic, etlCounts);
                    }
                }
            }
            for (FileStatus fileStatus2 : fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.COUNTS_PREFIX))) {
                if (this.props.getProperty(ETL_KEEP_COUNT_FILES, "false").equals("true")) {
                    fileSystem.rename(fileStatus2.getPath(), new Path(this.props.getProperty(ETL_COUNTS_PATH), fileStatus2.getPath().getName()));
                } else {
                    fileSystem.delete(fileStatus2.getPath(), true);
                }
            }
            String kafkaBrokers = getKafkaBrokers(jobContext);
            Iterator it3 = hashMap.values().iterator();
            while (it3.hasNext()) {
                cls.getDeclaredConstructor(EtlCounts.class).newInstance((EtlCounts) it3.next()).postTrackingCountToKafka(jobContext.getConfiguration(), this.props.getProperty(KAFKA_MONITOR_TIER), kafkaBrokers);
            }
        }
    }

    private void createReport(Job job, Map<String, Long> map) throws IOException, ClassNotFoundException {
        ((BaseReporter) ReflectionUtils.newInstance(job.getConfiguration().getClassByName(getReporterClass(job)), job.getConfiguration())).report(job, map);
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new CamusJob(), strArr);
    }

    public int run(String[] strArr) throws Exception {
        FSDataInputStream fileInputStream;
        Options options = new Options();
        options.addOption("p", true, "properties filename from the classpath");
        options.addOption("P", true, "external properties filename (hdfs: or local FS)");
        OptionBuilder.withArgName("property=value");
        OptionBuilder.hasArgs(2);
        OptionBuilder.withValueSeparator();
        OptionBuilder.withDescription("use value for given property");
        options.addOption(OptionBuilder.create("D"));
        CommandLine parse = new PosixParser().parse(options, strArr);
        if (!parse.hasOption('p') && !parse.hasOption('P')) {
            new HelpFormatter().printHelp("CamusJob.java", options);
            return 1;
        }
        if (parse.hasOption('p')) {
            this.props.load(getClass().getClassLoader().getResourceAsStream(parse.getOptionValue('p')));
        }
        if (parse.hasOption('P')) {
            String optionValue = parse.getOptionValue('P');
            if (optionValue.startsWith("hdfs:")) {
                fileInputStream = FileSystem.get(new Configuration()).open(new Path(optionValue));
            } else {
                fileInputStream = new FileInputStream(new File(optionValue));
            }
            this.props.load((InputStream) fileInputStream);
            fileInputStream.close();
        }
        this.props.putAll(parse.getOptionProperties("D"));
        run();
        return 0;
    }

    public static boolean getPostTrackingCountsToKafka(Job job) {
        return job.getConfiguration().getBoolean(POST_TRACKING_COUNTS_TO_KAFKA, true);
    }

    public static int getKafkaFetchRequestMinBytes(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_MIN_BYTES, 1024);
    }

    public static int getKafkaFetchRequestMaxWait(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_MAX_WAIT, 1000);
    }

    public static String getKafkaBrokers(JobContext jobContext) {
        String str = jobContext.getConfiguration().get(KAFKA_BROKERS);
        if (str == null) {
            str = jobContext.getConfiguration().get(KAFKA_HOST_URL);
            if (str != null) {
                log.warn("The configuration properties kafka.host.url and kafka.host.port are deprecated. Please switch to using kafka.brokers");
                return str + ":" + jobContext.getConfiguration().getInt(KAFKA_HOST_PORT, 10251);
            }
        }
        return str;
    }

    public static int getKafkaFetchRequestCorrelationId(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_CORRELATION_ID, -1);
    }

    public static String getKafkaClientName(JobContext jobContext) {
        return jobContext.getConfiguration().get(KAFKA_CLIENT_NAME);
    }

    public static String getKafkaFetchRequestBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().get(KAFKA_FETCH_BUFFER_SIZE);
    }

    public static int getKafkaTimeoutValue(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_TIMEOUT_VALUE, 30000);
    }

    public static int getKafkaBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_BUFFER_SIZE, 1048576);
    }

    public static boolean getLog4jConfigure(JobContext jobContext) {
        return jobContext.getConfiguration().getBoolean(LOG4J_CONFIGURATION, false);
    }

    public static String getReporterClass(JobContext jobContext) {
        return jobContext.getConfiguration().get(CAMUS_REPORTER_CLASS, "com.linkedin.camus.etl.kafka.reporter.TimeReporter");
    }
}
