package com.linkedin.camus.etl.kafka;

import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EtlCounts;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.Source;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJob.class */
public class CamusJob extends Configured implements Tool {
    public static final String ETL_EXECUTION_BASE_PATH = "etl.execution.base.path";
    public static final String ETL_EXECUTION_HISTORY_PATH = "etl.execution.history.path";
    public static final String ETL_COUNTS_PATH = "etl.counts.path";
    public static final String ETL_KEEP_COUNT_FILES = "etl.keep.count.files";
    public static final String ETL_EXECUTION_HISTORY_MAX_OF_QUOTA = "etl.execution.history.max.of.quota";
    public static final String ZK_AUDIT_HOSTS = "zookeeper.audit.hosts";
    public static final String KAFKA_MONITOR_TIER = "kafka.monitor.tier";
    public static final String CAMUS_MESSAGE_ENCODER_CLASS = "camus.message.encoder.class";
    public static final String BROKER_URI_FILE = "brokers.uri";
    public static final String POST_TRACKING_COUNTS_TO_KAFKA = "post.tracking.counts.to.kafka";
    public static final String KAFKA_FETCH_REQUEST_MAX_WAIT = "kafka.fetch.request.max.wait";
    public static final String KAFKA_FETCH_REQUEST_MIN_BYTES = "kafka.fetch.request.min.bytes";
    public static final String KAFKA_FETCH_REQUEST_CORRELATION_ID = "kafka.fetch.request.correlationid";
    public static final String KAFKA_CLIENT_NAME = "kafka.client.name";
    public static final String KAFKA_FETCH_BUFFER_SIZE = "kafka.fetch.buffer.size";
    public static final String KAFKA_BROKERS = "kafka.brokers";
    public static final String KAFKA_HOST_URL = "kafka.host.url";
    public static final String KAFKA_HOST_PORT = "kafka.host.port";
    public static final String KAFKA_TIMEOUT_VALUE = "kafka.timeout.value";
    public static final String LOG4J_CONFIGURATION = "log4j.configuration";
    private static Logger log;
    private final Properties props;
    private static HashMap<String, Long> timingMap = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJob$PrefixFilter.class */
    public class PrefixFilter implements PathFilter {
        private final String prefix;

        public PrefixFilter(String str) {
            this.prefix = str;
        }

        public boolean accept(Path path) {
            return path.getName().startsWith(this.prefix);
        }
    }

    public CamusJob() throws IOException {
        this(new Properties());
    }

    public CamusJob(Properties properties) throws IOException {
        this(properties, Logger.getLogger(CamusJob.class));
    }

    public CamusJob(Properties properties, Logger logger) throws IOException {
        this.props = properties;
        log = logger;
    }

    public static void startTiming(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) - System.currentTimeMillis()));
    }

    public static void stopTiming(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) + System.currentTimeMillis()));
    }

    public static void setTime(String str) {
        timingMap.put(str, Long.valueOf((timingMap.get(str) == null ? 0L : timingMap.get(str).longValue()) + System.currentTimeMillis()));
    }

    private Job createJob(Properties properties) throws IOException {
        if (getConf() == null) {
            setConf(new Configuration());
        }
        populateConf(properties, getConf(), log);
        Job job = new Job(getConf());
        job.setJarByClass(CamusJob.class);
        if (job.getConfiguration().get("camus.job.name") != null) {
            job.setJobName(job.getConfiguration().get("camus.job.name"));
        } else {
            job.setJobName("Camus Job");
        }
        return job;
    }

    public static void populateConf(Properties properties, Configuration configuration, Logger logger) throws IOException {
        for (Object obj : properties.keySet()) {
            configuration.set(obj.toString(), properties.getProperty(obj.toString()));
        }
        FileSystem fileSystem = FileSystem.get(configuration);
        String str = configuration.get("hdfs.default.classpath.dir", (String) null);
        if (str != null) {
            FileStatus[] listStatus = fileSystem.listStatus(new Path(str));
            if (listStatus != null) {
                for (int i = 0; i < listStatus.length; i++) {
                    if (!listStatus[i].isDir()) {
                        logger.info("Adding Jar to Distributed Cache Archive File:" + listStatus[i].getPath());
                        DistributedCache.addFileToClassPath(listStatus[i].getPath(), configuration, fileSystem);
                    }
                }
            } else {
                System.out.println("hdfs.default.classpath.dir " + str + " is empty.");
            }
        }
        String str2 = configuration.get("hadoop.external.jarFiles", (String) null);
        if (str2 != null) {
            for (String str3 : str2.split(",")) {
                logger.info("Adding external jar File:" + str3);
                DistributedCache.addFileToClassPath(new Path(str3), configuration, fileSystem);
            }
        }
    }

    public void run() throws Exception {
        startTiming("pre-setup");
        startTiming("total");
        Job createJob = createJob(this.props);
        if (getLog4jConfigure(createJob)) {
            DOMConfigurator.configure("log4j.xml");
        }
        FileSystem fileSystem = FileSystem.get(createJob.getConfiguration());
        log.info("Dir Destination set to: " + EtlMultiOutputFormat.getDestinationPath(createJob));
        Path path = new Path(this.props.getProperty(ETL_EXECUTION_BASE_PATH));
        Path path2 = new Path(this.props.getProperty(ETL_EXECUTION_HISTORY_PATH));
        if (!fileSystem.exists(path)) {
            log.info("The execution base path does not exist. Creating the directory");
            fileSystem.mkdirs(path);
        }
        if (!fileSystem.exists(path2)) {
            log.info("The history base path does not exist. Creating the directory.");
            fileSystem.mkdirs(path2);
        }
        ContentSummary contentSummary = fileSystem.getContentSummary(path);
        long quota = ((float) contentSummary.getQuota()) * createJob.getConfiguration().getFloat(ETL_EXECUTION_HISTORY_MAX_OF_QUOTA, 0.5f);
        long j = quota == 0 ? 50000L : quota;
        long fileCount = contentSummary.getFileCount() + contentSummary.getDirectoryCount();
        FileStatus[] listStatus = fileSystem.listStatus(path2);
        Arrays.sort(listStatus, new Comparator<FileStatus>() { // from class: com.linkedin.camus.etl.kafka.CamusJob.1
            @Override // java.util.Comparator
            public int compare(FileStatus fileStatus, FileStatus fileStatus2) {
                return fileStatus.getPath().getName().compareTo(fileStatus2.getPath().getName());
            }
        });
        for (int i = 0; i < listStatus.length - 1 && j < fileCount; i++) {
            FileStatus fileStatus = listStatus[i];
            log.info("removing old execution: " + fileStatus.getPath().getName());
            ContentSummary contentSummary2 = fileSystem.getContentSummary(fileStatus.getPath());
            fileCount -= contentSummary2.getFileCount() - contentSummary2.getDirectoryCount();
            fileSystem.delete(fileStatus.getPath(), true);
        }
        if (listStatus.length > 0) {
            Path path3 = listStatus[listStatus.length - 1].getPath();
            FileInputFormat.setInputPaths(createJob, new Path[]{path3});
            log.info("Previous execution: " + path3.toString());
        } else {
            System.out.println("No previous execution, all topics pulled from earliest available offset");
        }
        String dateTime = new DateTime().toString(DateUtils.getDateTimeFormatter("YYYY-MM-dd-HH-mm-ss", DateTimeZone.UTC));
        Path path4 = new Path(path, dateTime);
        FileOutputFormat.setOutputPath(createJob, path4);
        log.info("New execution temp location: " + path4.toString());
        EtlInputFormat.setLogger(log);
        createJob.setInputFormatClass(EtlInputFormat.class);
        createJob.setOutputFormatClass(EtlMultiOutputFormat.class);
        createJob.setNumReduceTasks(0);
        stopTiming("pre-setup");
        createJob.submit();
        createJob.waitForCompletion(true);
        Counters counters = createJob.getCounters();
        Iterator it = counters.getGroupNames().iterator();
        while (it.hasNext()) {
            CounterGroup<Counter> group = counters.getGroup((String) it.next());
            log.info("Group: " + group.getDisplayName());
            for (Counter counter : group) {
                log.info(counter.getDisplayName() + ":\t" + counter.getValue());
            }
        }
        stopTiming("hadoop");
        startTiming("commit");
        sendTrackingCounts(createJob, fileSystem, path4);
        printErrors(fileSystem, path4);
        Path path5 = new Path(path2, dateTime);
        log.debug("Moving execution to history : " + path5);
        fileSystem.rename(path4, path5);
        log.info("Job finished");
        stopTiming("commit");
        stopTiming("total");
        createReport(createJob, timingMap);
        if (createJob.isSuccessful()) {
            return;
        }
        for (TaskReport taskReport : new JobClient(new JobConf(createJob.getConfiguration())).getMapTaskReports(createJob.getTaskCompletionEvents(0)[0].getTaskAttemptId().getJobID())) {
            if (taskReport.getCurrentStatus().equals(TIPStatus.FAILED)) {
                for (String str : taskReport.getDiagnostics()) {
                    System.err.println("task error: " + str);
                }
            }
        }
        throw new RuntimeException("hadoop job failed");
    }

    public void printErrors(FileSystem fileSystem, Path path) throws IOException {
        for (FileStatus fileStatus : fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.ERRORS_PREFIX))) {
            SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fileStatus.getPath(), fileSystem.getConf());
            EtlKey etlKey = new EtlKey();
            ExceptionWritable exceptionWritable = new ExceptionWritable();
            while (reader.next(etlKey, exceptionWritable)) {
                System.err.println(etlKey.toString());
                System.err.println(exceptionWritable.toString());
            }
            reader.close();
        }
    }

    public void sendTrackingCounts(JobContext jobContext, FileSystem fileSystem, Path path) throws IOException, URISyntaxException {
        if (EtlMultiOutputFormat.isRunTrackingPost(jobContext)) {
            FileStatus[] listStatus = fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.COUNTS_PREFIX));
            HashMap hashMap = new HashMap();
            for (FileStatus fileStatus : listStatus) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())), 1048576);
                StringBuffer stringBuffer = new StringBuffer();
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        stringBuffer.append(readLine);
                    }
                }
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                Iterator it = ((ArrayList) objectMapper.readValue(stringBuffer.toString(), new TypeReference<ArrayList<EtlCounts>>() { // from class: com.linkedin.camus.etl.kafka.CamusJob.2
                })).iterator();
                while (it.hasNext()) {
                    EtlCounts etlCounts = (EtlCounts) it.next();
                    String topic = etlCounts.getTopic();
                    if (hashMap.containsKey(topic)) {
                        EtlCounts etlCounts2 = (EtlCounts) hashMap.get(topic);
                        etlCounts2.setEndTime(Math.max(etlCounts2.getEndTime(), etlCounts.getEndTime()));
                        etlCounts2.setLastTimestamp(Math.max(etlCounts2.getLastTimestamp(), etlCounts.getLastTimestamp()));
                        etlCounts2.setStartTime(Math.min(etlCounts2.getStartTime(), etlCounts.getStartTime()));
                        etlCounts2.setFirstTimestamp(Math.min(etlCounts2.getFirstTimestamp(), etlCounts.getFirstTimestamp()));
                        etlCounts2.setErrorCount(etlCounts2.getErrorCount() + etlCounts.getErrorCount());
                        etlCounts2.setGranularity(etlCounts.getGranularity());
                        etlCounts2.setTopic(etlCounts.getTopic());
                        Iterator<Map.Entry<String, Source>> it2 = etlCounts.getCounts().entrySet().iterator();
                        while (it2.hasNext()) {
                            Source value = it2.next().getValue();
                            if (etlCounts2.getCounts().containsKey(value.toString())) {
                                Source source = etlCounts2.getCounts().get(value.toString());
                                source.setCount(source.getCount() + value.getCount());
                                etlCounts2.getCounts().put(source.toString(), source);
                            } else {
                                etlCounts2.getCounts().put(value.toString(), value);
                            }
                            hashMap.put(topic, etlCounts2);
                        }
                    } else {
                        hashMap.put(topic, etlCounts);
                    }
                }
            }
            for (FileStatus fileStatus2 : fileSystem.listStatus(path, new PrefixFilter(EtlMultiOutputFormat.COUNTS_PREFIX))) {
                if (this.props.getProperty(ETL_KEEP_COUNT_FILES, "false").equals("true")) {
                    fileSystem.rename(fileStatus2.getPath(), new Path(this.props.getProperty(ETL_COUNTS_PATH), fileStatus2.getPath().getName()));
                } else {
                    fileSystem.delete(fileStatus2.getPath(), true);
                }
            }
            String kafkaBrokers = getKafkaBrokers(jobContext);
            Iterator it3 = hashMap.values().iterator();
            while (it3.hasNext()) {
                ((EtlCounts) it3.next()).postTrackingCountToKafka(jobContext.getConfiguration(), this.props.getProperty(KAFKA_MONITOR_TIER), kafkaBrokers);
            }
        }
    }

    private void createReport(Job job, Map<String, Long> map) throws IOException {
        StringBuilder sb = new StringBuilder();
        sb.append("***********Timing Report*************\n");
        sb.append("Job time (seconds):\n");
        double longValue = map.get("pre-setup").longValue() / 1000;
        double longValue2 = map.get("getSplits").longValue() / 1000;
        double longValue3 = map.get("hadoop").longValue() / 1000;
        double longValue4 = map.get("commit").longValue() / 1000;
        double longValue5 = map.get("total").longValue() / 1000;
        sb.append(String.format("    %12s %6.1f (%s)\n", "pre setup", Double.valueOf(longValue), NumberFormat.getPercentInstance().format(longValue / longValue5).toString()));
        sb.append(String.format("    %12s %6.1f (%s)\n", "get splits", Double.valueOf(longValue2), NumberFormat.getPercentInstance().format(longValue2 / longValue5).toString()));
        sb.append(String.format("    %12s %6.1f (%s)\n", "hadoop job", Double.valueOf(longValue3), NumberFormat.getPercentInstance().format(longValue3 / longValue5).toString()));
        sb.append(String.format("    %12s %6.1f (%s)\n", "commit", Double.valueOf(longValue4), NumberFormat.getPercentInstance().format(longValue4 / longValue5).toString()));
        sb.append(String.format("Total: %d minutes %d seconds\n", Integer.valueOf(((int) longValue5) / 60), Integer.valueOf(((int) longValue5) % 60)));
        TaskReport[] mapTaskReports = new JobClient(new JobConf(job.getConfiguration())).getMapTaskReports(JobID.downgrade(job.getJobID()));
        double d = 9.223372036854776E18d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        double d4 = 9.223372036854776E18d;
        double d5 = 0.0d;
        double d6 = 0.0d;
        long j = 0;
        TreeMap treeMap = new TreeMap();
        for (TaskReport taskReport : mapTaskReports) {
            long startTime = taskReport.getStartTime() - map.get("hadoop_start").longValue();
            d = ((double) startTime) < d ? startTime : d;
            d2 = ((double) startTime) > d2 ? startTime : d2;
            d3 += startTime;
            long finishTime = taskReport.getFinishTime() - taskReport.getStartTime();
            j += finishTime;
            d4 = ((double) finishTime) < d4 ? finishTime : d4;
            d5 = ((double) finishTime) > d5 ? finishTime : d5;
            d6 += finishTime;
            if (!treeMap.containsKey(Long.valueOf(finishTime))) {
                treeMap.put(Long.valueOf(finishTime), new ArrayList());
            }
            ((List) treeMap.get(Long.valueOf(finishTime))).add(taskReport);
        }
        double length = d3 / mapTaskReports.length;
        double length2 = d6 / mapTaskReports.length;
        double d7 = length / 1000.0d;
        double d8 = d5 / 1000.0d;
        double d9 = length2 / 1000.0d;
        sb.append("\nHadoop job task times (seconds):\n");
        sb.append(String.format("    %12s %6.1f\n", "min", Double.valueOf(d4 / 1000.0d)));
        sb.append(String.format("    %12s %6.1f\n", "mean", Double.valueOf(d9)));
        sb.append(String.format("    %12s %6.1f\n", "max", Double.valueOf(d8)));
        sb.append(String.format("    %12s %6.1f/%.1f = %.2f\n", "skew", Double.valueOf(d9), Double.valueOf(d8), Double.valueOf(d9 / d8)));
        sb.append("\nTask wait time (seconds):\n");
        sb.append(String.format("    %12s %6.1f\n", "min", Double.valueOf(d / 1000.0d)));
        sb.append(String.format("    %12s %6.1f\n", "mean", Double.valueOf(d7)));
        sb.append(String.format("    %12s %6.1f\n", "max", Double.valueOf(d2 / 1000.0d)));
        CounterGroup group = job.getCounters().getGroup("total");
        long value = group.findCounter("decode-time(ms)").getValue();
        long value2 = group.findCounter("request-time(ms)").getValue();
        long value3 = group.findCounter("mapper-time(ms)").getValue();
        long value4 = group.findCounter("data-read").getValue();
        sb.append("\nHadoop task breakdown:\n");
        sb.append(String.format("    %12s %s\n", "kafka", NumberFormat.getPercentInstance().format(value2 / j)));
        sb.append(String.format("    %12s %s\n", "decode", NumberFormat.getPercentInstance().format(value / j)));
        sb.append(String.format("    %12s %s\n", "map output", NumberFormat.getPercentInstance().format(value3 / j)));
        sb.append(String.format("    %12s %s\n", "other", NumberFormat.getPercentInstance().format((((j - value3) - value2) - value) / j)));
        sb.append(String.format("\n%16s %s\n", "Total MB read:", Long.valueOf((value4 / 1024) / 1024)));
        log.info(sb.toString());
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new CamusJob(), strArr);
    }

    public int run(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption("p", true, "properties filename from the classpath");
        options.addOption("P", true, "external properties filename");
        OptionBuilder.withArgName("property=value");
        OptionBuilder.hasArgs(2);
        OptionBuilder.withValueSeparator();
        OptionBuilder.withDescription("use value for given property");
        options.addOption(OptionBuilder.create("D"));
        CommandLine parse = new PosixParser().parse(options, strArr);
        if (!parse.hasOption('p') && !parse.hasOption('P')) {
            new HelpFormatter().printHelp("CamusJob.java", options);
            return 1;
        }
        if (parse.hasOption('p')) {
            this.props.load(getClass().getClassLoader().getResourceAsStream(parse.getOptionValue('p')));
        }
        if (parse.hasOption('P')) {
            this.props.load(new FileInputStream(new File(parse.getOptionValue('P'))));
        }
        this.props.putAll(parse.getOptionProperties("D"));
        run();
        return 0;
    }

    public static boolean getPostTrackingCountsToKafka(Job job) {
        return job.getConfiguration().getBoolean(POST_TRACKING_COUNTS_TO_KAFKA, true);
    }

    public static int getKafkaFetchRequestMinBytes(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_MIN_BYTES, 1024);
    }

    public static int getKafkaFetchRequestMaxWait(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_MAX_WAIT, 1000);
    }

    public static String getKafkaBrokers(JobContext jobContext) {
        String str = jobContext.getConfiguration().get(KAFKA_BROKERS);
        if (str == null) {
            str = jobContext.getConfiguration().get(KAFKA_HOST_URL);
            if (str != null) {
                log.warn("The configuration properties kafka.host.url and kafka.host.port are deprecated. Please switch to using kafka.brokers");
                return str + ":" + jobContext.getConfiguration().getInt(KAFKA_HOST_PORT, 10251);
            }
        }
        return str;
    }

    public static int getKafkaFetchRequestCorrelationId(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_REQUEST_CORRELATION_ID, -1);
    }

    public static String getKafkaClientName(JobContext jobContext) {
        return jobContext.getConfiguration().get(KAFKA_CLIENT_NAME);
    }

    public static String getKafkaFetchRequestBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().get(KAFKA_FETCH_BUFFER_SIZE);
    }

    public static int getKafkaTimeoutValue(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_TIMEOUT_VALUE, 30000);
    }

    public static int getKafkaBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_FETCH_BUFFER_SIZE, 1048576);
    }

    public static boolean getLog4jConfigure(JobContext jobContext) {
        return jobContext.getConfiguration().getBoolean(LOG4J_CONFIGURATION, false);
    }
}
