package com.linkedin.camus.sweeper;

import com.linkedin.camus.sweeper.CamusSweeper;
import com.linkedin.camus.sweeper.mapreduce.CamusSweeperJob;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/sweeper/CamusSingleFolderSweeper.class */
public class CamusSingleFolderSweeper extends CamusSweeper {
    private static final String CAMUS_SWEEPER_IO_CONFIGURER_CLASS = "camus.sweeper.io.configurer.class";
    private static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
    static final String TOPIC_AND_HOUR = "topic.and.hour";
    static final String STATE_FILE_NAME = "_state";
    static final String MAPREDUCE_SUBMIT_TIME = "mapreduce.submit.time";
    public static final String FOLDER_HOUR = "camus.sweeper.folder.hour";
    private final CamusSweeperMetrics metrics;
    private static final Logger LOG = Logger.getLogger(CamusSingleFolderSweeper.class);
    private static final boolean DEFAULT_MAPRED_COMPRESS_MAP_OUTPUT = Boolean.TRUE.booleanValue();

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSingleFolderSweeper$KafkaCollector.class */
    private class KafkaCollector extends CamusSweeper.KafkaCollector {
        private final String topicAndHour;

        public KafkaCollector(Properties properties, String str, String str2) throws IOException {
            super(properties, str, str2);
            this.topicAndHour = properties.getProperty(CamusSingleFolderSweeper.TOPIC_AND_HOUR);
        }

        @Override // com.linkedin.camus.sweeper.CamusSweeper.KafkaCollector
        public void run() throws Exception {
            CamusSingleFolderSweeper.this.metrics.recordRunnerStartTimeByTopic(this.topicAndHour, System.currentTimeMillis());
            this.job.getConfiguration().setBoolean(CamusSingleFolderSweeper.MAPRED_COMPRESS_MAP_OUTPUT, CamusSingleFolderSweeper.DEFAULT_MAPRED_COMPRESS_MAP_OUTPUT);
            ((CamusSweeperJob) Class.forName(this.props.getProperty(CamusSingleFolderSweeper.CAMUS_SWEEPER_IO_CONFIGURER_CLASS)).newInstance()).setLogger(CamusSingleFolderSweeper.LOG).configureJob(this.topicName, this.job);
            setNumOfReducersAndSplitSizes();
            long currentTimeMillis = System.currentTimeMillis();
            CamusSingleFolderSweeper.this.metrics.recordMrSubmitTimeByTopic(this.topicAndHour, currentTimeMillis);
            submitMrJob();
            moveTmpPathToOutputPath();
            CamusSingleFolderSweeper.createStateFileInFolder(this.fs, this.outputPath, currentTimeMillis);
        }

        @Override // com.linkedin.camus.sweeper.CamusSweeper.KafkaCollector
        protected void submitMrJob() throws IOException, InterruptedException, ClassNotFoundException {
            this.job.submit();
            CamusSingleFolderSweeper.this.runningJobs.add(this.job);
            CamusSingleFolderSweeper.this.metrics.recordMrStartRunningTimeByTopic(this.topicAndHour, System.currentTimeMillis());
            CamusSingleFolderSweeper.LOG.info("job running for: " + this.props.getProperty(CamusSingleFolderSweeper.TOPIC_AND_HOUR) + ", url: " + this.job.getTrackingURL());
            this.job.waitForCompletion(false);
            CamusSingleFolderSweeper.this.metrics.recordMrFinishTimeByTopic(this.topicAndHour, System.currentTimeMillis());
            if (!this.job.isSuccessful()) {
                throw new RuntimeException("hadoop job failed.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSingleFolderSweeper$KafkaCollectorRunner.class */
    public class KafkaCollectorRunner extends CamusSweeper.KafkaCollectorRunner {
        public KafkaCollectorRunner(String str, Properties properties, List<CamusSweeper.SweeperError> list, String str2) {
            super(str, properties, list, str2);
        }

        @Override // com.linkedin.camus.sweeper.CamusSweeper.KafkaCollectorRunner, java.lang.Runnable
        public void run() {
            KafkaCollector kafkaCollector = null;
            try {
                CamusSingleFolderSweeper.LOG.info("Starting runner for " + this.props.getProperty(CamusSingleFolderSweeper.TOPIC_AND_HOUR));
                kafkaCollector = new KafkaCollector(this.props, this.name, this.topic);
                CamusSingleFolderSweeper.LOG.info("Running " + this.name + " for input " + this.props.getProperty("input.paths"));
                kafkaCollector.run();
            } catch (Throwable th) {
                CamusSingleFolderSweeper.LOG.error(new StringBuilder().append("Failed for ").append(this.name).append(" ,job: ").append(kafkaCollector).toString() == null ? null : kafkaCollector.getJob() + " failed for " + this.props.getProperty("input.paths") + " Exception:" + th.getLocalizedMessage());
                this.errorQueue.add(new CamusSweeper.SweeperError(this.name, this.props.get("input.paths").toString(), th));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSingleFolderSweeper$OutlierCollectorRunner.class */
    public class OutlierCollectorRunner implements Callable<Void> {
        private final Properties props;
        private final FileSystem fs;

        public OutlierCollectorRunner(Properties properties, FileSystem fileSystem) {
            this.props = properties;
            this.fs = fileSystem;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws IOException {
            String property = this.props.getProperty("input.paths");
            String property2 = this.props.getProperty("dest.path");
            long destinationModTime = CamusSingleFolderSweeper.getDestinationModTime(this.fs, property2);
            Path path = new Path(property2, "outlier");
            this.fs.mkdirs(path);
            for (String str : property.split(",")) {
                long j = Long.MIN_VALUE;
                for (FileStatus fileStatus : this.fs.globStatus(new Path(new Path(str), "*"), new HiddenFilter())) {
                    if (fileStatus.getModificationTime() > destinationModTime) {
                        if (j < fileStatus.getModificationTime()) {
                            j = fileStatus.getModificationTime();
                        }
                        CamusSingleFolderSweeper.LOG.info("copying outlier file " + fileStatus.getPath() + " to " + path);
                        FileUtil.copy(this.fs, fileStatus.getPath(), this.fs, path, false, true, new Configuration());
                        this.fs.rename(fileStatus.getPath(), new Path(path, fileStatus.getPath().getName()));
                    }
                }
                if (j != Long.MIN_VALUE) {
                    CamusSingleFolderSweeper.createStateFileInFolder(this.fs, new Path(property2), j);
                }
            }
            return null;
        }
    }

    public CamusSingleFolderSweeper() {
        this.metrics = new CamusSweeperMetrics();
    }

    public CamusSingleFolderSweeper(Properties properties) {
        super(properties);
        this.metrics = new CamusSweeperMetrics();
    }

    @Override // com.linkedin.camus.sweeper.CamusSweeper
    public Map<FileStatus, String> findAllTopics(Path path, PathFilter pathFilter, String str, FileSystem fileSystem) throws IOException {
        HashMap hashMap = new HashMap();
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (fileStatus.isDir() && pathFilter.accept(fileStatus.getPath())) {
                LOG.info("found topic: " + fileStatus.getPath().getName());
                hashMap.put(fileSystem.getFileStatus(fileStatus.getPath()), fileStatus.getPath().getName());
            }
        }
        return hashMap;
    }

    @Override // com.linkedin.camus.sweeper.CamusSweeper
    public void run() throws Exception {
        this.metrics.setTimeStart(System.currentTimeMillis());
        super.run();
        reportMetrics();
        addOutliers();
    }

    private void addOutliers() throws IOException {
        createExecutorService();
        ArrayList arrayList = new ArrayList();
        Iterator<Properties> it = this.planner.getOutlierProperties().iterator();
        while (it.hasNext()) {
            arrayList.add(this.executorService.submit(new OutlierCollectorRunner(it.next(), FileSystem.get(getConf()))));
        }
        this.executorService.shutdown();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            try {
                ((Future) it2.next()).get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    private void reportMetrics() throws IllegalArgumentException, IOException {
        LOG.info("reporting metrics");
        this.metrics.reportTotalRunningTime();
        this.metrics.reportTotalDataSize();
        this.metrics.reportDataSizeByTopic();
        this.metrics.reportDurationFromStartToRunnerStart();
        this.metrics.reportDurationFromRunnerStartToMRSubmitted();
        this.metrics.reportDurationFromMRSubmittedToMRStarted();
        this.metrics.reportDurationFromMRStartedToMRFinished();
        LOG.info("finished reporting metrics");
    }

    public static long getDestinationModTime(FileSystem fileSystem, String str) throws IOException {
        for (FileStatus fileStatus : fileSystem.listStatus(new Path(str))) {
            if (!fileStatus.isDir() && fileStatus.getPath().getName().equals(STATE_FILE_NAME)) {
                LOG.info("Found state file: " + fileStatus.getPath());
                InputStream inputStream = null;
                try {
                    inputStream = fileSystem.open(fileStatus.getPath());
                    Properties properties = new Properties();
                    properties.load(inputStream);
                    if (properties.containsKey(MAPREDUCE_SUBMIT_TIME)) {
                        long longValue = Long.valueOf(properties.getProperty(MAPREDUCE_SUBMIT_TIME)).longValue();
                        if (inputStream != null) {
                            inputStream.close();
                        }
                        return longValue;
                    }
                    if (inputStream != null) {
                        inputStream.close();
                    }
                } catch (Throwable th) {
                    if (inputStream != null) {
                        inputStream.close();
                    }
                    throw th;
                }
            }
        }
        return fileSystem.getFileStatus(new Path(str)).getModificationTime();
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new CamusSingleFolderSweeper(), strArr);
    }

    @Override // com.linkedin.camus.sweeper.CamusSweeper
    protected void runCollectorForTopicDir(FileSystem fileSystem, String str, Path path, Path path2) throws Exception {
        LOG.info("Running collector for topic " + str + " source:" + path + " dest:" + path2);
        ArrayList arrayList = new ArrayList();
        Iterator<Properties> it = this.planner.createSweeperJobProps(str, path, path2, fileSystem, this.metrics).iterator();
        while (it.hasNext()) {
            arrayList.add(runCollector(it.next(), str));
        }
    }

    @Override // com.linkedin.camus.sweeper.CamusSweeper
    protected Future<?> runCollector(Properties properties, String str) {
        String str2 = str + "-" + UUID.randomUUID().toString();
        properties.put("tmp.path", properties.getProperty("camus.sweeper.tmp.dir") + "/" + str2 + "_" + System.currentTimeMillis());
        if (properties.containsKey("reduce.count.override." + str)) {
            properties.put("reducer.count", Integer.valueOf(Integer.parseInt(properties.getProperty("reduce.count.override." + str))));
        }
        LOG.info("Processing " + properties.get("input.paths"));
        return this.executorService.submit(new KafkaCollectorRunner(str2, properties, this.errorMessages, str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void createStateFileInFolder(FileSystem fileSystem, Path path, long j) throws IOException {
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (!fileStatus.isDir() && fileStatus.getPath().getName().equals(STATE_FILE_NAME) && !fileSystem.delete(fileStatus.getPath(), false)) {
                throw new IOException("Failed to delete state file " + fileStatus.getPath());
            }
        }
        OutputStream outputStream = null;
        try {
            outputStream = fileSystem.create(new Path(path, STATE_FILE_NAME));
            Properties properties = new Properties();
            properties.setProperty(MAPREDUCE_SUBMIT_TIME, String.valueOf(j));
            properties.store(outputStream, "");
            if (outputStream != null) {
                outputStream.close();
            }
        } catch (Throwable th) {
            if (outputStream != null) {
                outputStream.close();
            }
            throw th;
        }
    }
}
