package com.linkedin.camus.sweeper;

import com.linkedin.camus.sweeper.mapreduce.CamusSweeperJob;
import com.linkedin.camus.sweeper.utils.PriorityExecutor;
import com.linkedin.camus.sweeper.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.mortbay.log.Log;

/* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper.class */
public class CamusSweeper extends Configured implements Tool {
    protected static final String DEFAULT_NUM_THREADS = "5";
    protected static final String CAMUS_SWEEPER_PRIORITY_LIST = "camus.sweeper.priority.list";
    private static final String MAX_FILES = "max.files";
    private static final int DEFAULT_MAX_FILES = 24;
    private static final String REDUCER_COUNT = "reducer.count";
    private static final int DEFAULT_REDUCER_COUNT = 45;
    private static final String MAPRED_MIN_SPLIT_SIZE = "mapred.min.split.size";
    private static final String MAPRED_MAX_SPLIT_SIZE = "mapred.max.split.size";
    private static final String TMP_PATH = "tmp.path";
    static final String INPUT_PATHS = "input.paths";
    static final String DEST_PATH = "dest.path";
    protected List<SweeperError> errorMessages;
    protected List<Job> runningJobs;
    protected Properties props;
    protected FileSystem fileSystem;
    protected ExecutorService executorService;
    protected FsPermission perm;
    protected String destSubdir;
    protected String sourceSubdir;
    private static Logger log = Logger.getLogger(CamusSweeper.class);
    protected CamusSweeperPlanner planner;
    protected Map<String, Integer> priorityTopics;

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$KafkaCollector.class */
    protected class KafkaCollector {
        protected static final String TARGET_FILE_SIZE = "camus.sweeper.target.file.size";
        protected static final long TARGET_FILE_SIZE_DEFAULT = 1610612736;
        protected long targetFileSize;
        protected final String jobName;
        protected final Properties props;
        protected final String topicName;
        protected final Path[] inputPaths;
        protected final Path tmpPath;
        protected final Path outputPath;
        protected final FileSystem fs;
        protected Job job;

        public KafkaCollector(Properties properties, String str, String str2) throws IOException {
            this.jobName = str;
            this.props = properties;
            this.topicName = str2;
            this.targetFileSize = properties.containsKey(TARGET_FILE_SIZE) ? Long.parseLong(properties.getProperty(TARGET_FILE_SIZE)) : TARGET_FILE_SIZE_DEFAULT;
            this.job = new Job(CamusSweeper.this.getConf());
            this.job.setJarByClass(CamusSweeper.class);
            this.job.setJobName(str);
            for (Map.Entry entry : properties.entrySet()) {
                this.job.getConfiguration().set((String) entry.getKey(), (String) entry.getValue());
            }
            this.fs = FileSystem.get(this.job.getConfiguration());
            this.inputPaths = getInputPaths();
            this.tmpPath = new Path(this.job.getConfiguration().get(CamusSweeper.TMP_PATH));
            this.outputPath = new Path(this.job.getConfiguration().get(CamusSweeper.DEST_PATH));
            addInputAndOutputPathsToFileInputFormat();
        }

        private void addInputAndOutputPathsToFileInputFormat() throws IOException {
            for (Path path : this.inputPaths) {
                FileInputFormat.addInputPath(this.job, path);
            }
            FileOutputFormat.setOutputPath(this.job, this.tmpPath);
        }

        private Path[] getInputPaths() {
            List<String> stringList = Utils.getStringList(this.props, CamusSweeper.INPUT_PATHS);
            Path[] pathArr = new Path[stringList.size()];
            for (int i = 0; i < stringList.size(); i++) {
                pathArr[i] = new Path(stringList.get(i));
            }
            return pathArr;
        }

        public void run() throws Exception {
            this.job.getConfiguration().set("mapred.compress.map.output", "true");
            ((CamusSweeperJob) Class.forName(this.props.getProperty("camus.sweeper.io.configurer.class")).newInstance()).setLogger(CamusSweeper.log).configureJob(this.topicName, this.job);
            setNumOfReducersAndSplitSizes();
            submitMrJob();
            moveTmpPathToOutputPath();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void moveTmpPathToOutputPath() throws IOException {
            Path path = null;
            if (this.fs.exists(this.outputPath)) {
                path = new Path("/tmp", "_old_" + this.job.getJobID());
                moveExistingContentInOutputPathToOldPath(path);
            }
            CamusSweeper.log.info("Moving " + this.tmpPath + " to " + this.outputPath);
            CamusSweeper.this.mkdirs(this.fs, this.outputPath.getParent(), CamusSweeper.this.perm, this.job.getConfiguration());
            if (this.fs.rename(this.tmpPath, this.outputPath)) {
                deleteOldPath(path);
            } else {
                this.fs.rename(path, this.outputPath);
                this.fs.delete(this.tmpPath, true);
                throw new RuntimeException("Error: cannot rename " + this.tmpPath + " to " + this.outputPath);
            }
        }

        private void deleteOldPath(Path path) throws IOException {
            if (path == null || !this.fs.exists(path)) {
                return;
            }
            CamusSweeper.log.info("Deleting " + path);
            this.fs.delete(path, true);
        }

        private void moveExistingContentInOutputPathToOldPath(Path path) throws IOException {
            CamusSweeper.log.info("Path " + this.outputPath + " exists. Overwriting. Existing content will be moved to " + path);
            if (this.fs.rename(this.outputPath, path)) {
                return;
            }
            this.fs.delete(this.tmpPath, true);
            throw new RuntimeException("Error: cannot rename " + this.outputPath + " to " + path);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setNumOfReducersAndSplitSizes() throws IOException {
            long inputSize = getInputSize();
            int min = Math.min(((int) (inputSize / this.targetFileSize)) + 1, this.job.getConfiguration().getInt(CamusSweeper.MAX_FILES, 24));
            if (this.job.getNumReduceTasks() != 0) {
                determineAndSetNumOfReducers(min);
            } else {
                setSplitSizes(inputSize / min);
            }
        }

        private void setSplitSizes(long j) {
            CamusSweeper.log.info("Setting target split size " + j);
            this.job.getConfiguration().setLong(CamusSweeper.MAPRED_MAX_SPLIT_SIZE, j);
            this.job.getConfiguration().setLong(CamusSweeper.MAPRED_MIN_SPLIT_SIZE, j);
        }

        private void determineAndSetNumOfReducers(int i) {
            this.job.setNumReduceTasks(this.job.getConfiguration().get(CamusSweeper.REDUCER_COUNT) != null ? this.job.getConfiguration().getInt(CamusSweeper.REDUCER_COUNT, CamusSweeper.DEFAULT_REDUCER_COUNT) : i);
        }

        private long getInputSize() throws IOException {
            long j = 0;
            for (Path path : this.inputPaths) {
                CamusSweeper.log.info("inputPath: " + path.toString() + ", size=" + this.fs.getContentSummary(path).getLength());
                j += this.fs.getContentSummary(path).getLength();
            }
            return j;
        }

        protected void submitMrJob() throws IOException, InterruptedException, ClassNotFoundException {
            this.job.submit();
            CamusSweeper.this.runningJobs.add(this.job);
            CamusSweeper.log.info("job running for: " + this.job.getJobName() + ", url: " + this.job.getTrackingURL());
            this.job.waitForCompletion(false);
            if (!this.job.isSuccessful()) {
                throw new RuntimeException("hadoop job failed.");
            }
        }

        protected String getJobName() {
            return this.jobName;
        }

        protected String getTopicName() {
            return this.topicName;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Job getJob() {
            return this.job;
        }

        protected Properties getProps() {
            return this.props;
        }
    }

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$KafkaCollectorRunner.class */
    public class KafkaCollectorRunner implements Runnable, PriorityExecutor.Important {
        protected Properties props;
        protected String name;
        protected List<SweeperError> errorQueue;
        protected String topic;
        protected int priority;

        public KafkaCollectorRunner(String str, Properties properties, List<SweeperError> list, String str2) {
            this.name = str;
            this.props = properties;
            this.errorQueue = list;
            this.topic = str2;
            this.priority = CamusSweeper.this.priorityTopics.containsKey(str2) ? CamusSweeper.this.priorityTopics.get(str2).intValue() : 0;
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaCollector kafkaCollector = null;
            try {
                CamusSweeper.log.info("Starting runner for " + this.name);
                KafkaCollector kafkaCollector2 = new KafkaCollector(this.props, this.name, this.topic);
                CamusSweeper.log.info("Waiting until input for job " + this.name + " is ready. Input directories:  " + this.props.getProperty(CamusSweeper.INPUT_PATHS));
                if (!CamusSweeper.this.planner.waitUntilReadyToProcess(this.props, CamusSweeper.this.fileSystem)) {
                    throw new JobCancelledException("Job has been cancelled by planner while waiting for input to be ready.");
                }
                CamusSweeper.log.info("Running " + this.name + " for input " + this.props.getProperty(CamusSweeper.INPUT_PATHS));
                kafkaCollector2.run();
            } catch (Throwable th) {
                th.printStackTrace();
                CamusSweeper.log.error(new StringBuilder().append("Failed for ").append(this.name).append(" ,job: ").append((Object) null).toString() == null ? null : kafkaCollector.getJob() + " failed for " + this.props.getProperty(CamusSweeper.INPUT_PATHS) + " Exception:" + th.getLocalizedMessage());
                this.errorQueue.add(new SweeperError(this.name, this.props.get(CamusSweeper.INPUT_PATHS).toString(), th));
            }
        }

        @Override // com.linkedin.camus.sweeper.utils.PriorityExecutor.Important
        public int getPriority() {
            return this.priority;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$SweeperError.class */
    public static class SweeperError {
        protected final String topic;
        protected final String input;
        protected final Throwable e;

        public SweeperError(String str, String str2, Throwable th) {
            this.topic = str;
            this.input = str2;
            this.e = th;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getInputPath() {
            return this.input;
        }

        public Throwable getException() {
            return this.e;
        }
    }

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$WhiteBlackListPathFilter.class */
    public static class WhiteBlackListPathFilter implements PathFilter {
        private Pattern whitelist;
        private Pattern blacklist;
        private int rootLength;

        public WhiteBlackListPathFilter(Collection<String> collection, Collection<String> collection2, Path path) {
            if (collection.isEmpty()) {
                this.whitelist = Pattern.compile(".*");
            } else {
                this.whitelist = compileMultiPattern(collection);
            }
            if (collection2.isEmpty()) {
                this.blacklist = Pattern.compile("a^");
            } else {
                this.blacklist = compileMultiPattern(collection2);
            }
            CamusSweeper.log.info("whitelist: " + this.whitelist.toString());
            CamusSweeper.log.info("blacklist: " + this.blacklist.toString());
            this.rootLength = path.toString().length() + 1;
        }

        public boolean accept(Path path) {
            String name = path.getName();
            String replaceAll = path.toString().substring(this.rootLength).replaceAll("/", ".");
            return (!this.whitelist.matcher(replaceAll).matches() || this.blacklist.matcher(replaceAll).matches() || name.startsWith(".") || name.startsWith("_")) ? false : true;
        }

        private Pattern compileMultiPattern(Collection<String> collection) {
            String str = "(";
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                str = str + it.next() + "|";
            }
            return Pattern.compile(str.substring(0, str.length() - 1) + ")");
        }
    }

    public CamusSweeper() {
        this.perm = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
        this.priorityTopics = new HashMap();
        this.props = new Properties();
    }

    public CamusSweeper(Properties properties) {
        this.perm = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
        this.priorityTopics = new HashMap();
        this.props = properties;
        init();
    }

    private void init() {
        for (String str : this.props.getProperty(CAMUS_SWEEPER_PRIORITY_LIST, "").split(",")) {
            String[] split = str.split("=");
            this.priorityTopics.put(split[0], Integer.valueOf(split.length > 1 ? Integer.parseInt(split[1]) : 1));
        }
        this.errorMessages = Collections.synchronizedList(new ArrayList());
        DateTimeZone.setDefault(DateTimeZone.forID(this.props.getProperty("default.timezone")));
        this.runningJobs = Collections.synchronizedList(new ArrayList());
        this.sourceSubdir = this.props.getProperty("camus.sweeper.source.subdir");
        this.destSubdir = this.props.getProperty("camus.sweeper.dest.subdir");
        try {
            this.planner = ((CamusSweeperPlanner) Class.forName(this.props.getProperty("camus.sweeper.planner.class")).newInstance()).setPropertiesLogger(this.props, log);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void cancel() throws Exception {
        this.executorService.shutdownNow();
        for (Job job : this.runningJobs) {
            if (!job.isComplete()) {
                try {
                    job.killJob();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public Map<FileStatus, String> findAllTopics(Path path, PathFilter pathFilter, String str, FileSystem fileSystem) throws IOException {
        HashMap hashMap = new HashMap();
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            findAllTopics(fileStatus.getPath(), pathFilter, str, "", fileSystem, hashMap);
        }
        return hashMap;
    }

    private void findAllTopics(Path path, PathFilter pathFilter, String str, String str2, FileSystem fileSystem, Map<FileStatus, String> map) throws IOException {
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (fileStatus.isDir()) {
                String str3 = (str2.isEmpty() ? "" : str2 + ".") + fileStatus.getPath().getParent().getName();
                if (fileStatus.getPath().getName().equals(str) && pathFilter.accept(fileStatus.getPath().getParent())) {
                    map.put(fileSystem.getFileStatus(fileStatus.getPath().getParent()), str3);
                } else {
                    findAllTopics(fileStatus.getPath(), pathFilter, str, str3, fileSystem, map);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createExecutorService() {
        this.executorService = new PriorityExecutor(Integer.parseInt(this.props.getProperty("num.threads", DEFAULT_NUM_THREADS)));
    }

    public void run() throws Exception {
        log.info("Starting kafka sweeper");
        createExecutorService();
        String property = this.props.getProperty("camus.sweeper.source.dir");
        String property2 = this.props.getProperty("camus.sweeper.dest.dir", "");
        String property3 = this.props.getProperty("camus.sweeper.tmp.dir", "");
        if (property2.isEmpty()) {
            property2 = property;
        }
        if (property3.isEmpty()) {
            property3 = "/tmp";
        }
        this.props.setProperty("camus.sweeper.tmp.dir", property3);
        log.info("fromLocation: " + property);
        log.info("destLocation: " + property2);
        List<String> stringList = Utils.getStringList(this.props, "camus.sweeper.blacklist");
        List<String> stringList2 = Utils.getStringList(this.props, "camus.sweeper.whitelist");
        Configuration configuration = new Configuration();
        for (Map.Entry entry : this.props.entrySet()) {
            configuration.set((String) entry.getKey(), (String) entry.getValue());
        }
        this.fileSystem = FileSystem.get(configuration);
        Path path = new Path(property3);
        if (!this.fileSystem.exists(path)) {
            this.fileSystem.mkdirs(path, this.perm);
            Log.info("Created tmpPath " + path + " with permissions " + this.perm + " and umask " + getUmask(configuration));
            if (!this.fileSystem.getFileStatus(path).getPermission().equals(this.perm)) {
                log.error(String.format("Wrong permission for %s. Expects %s, actual %s", path, this.perm, this.fileSystem.getFileStatus(path).getPermission()));
                this.fileSystem.setPermission(path, this.perm);
            }
            String userName = UserGroupInformation.getCurrentUser().getUserName();
            this.fileSystem.setOwner(path, userName, userName);
        }
        Path path2 = new Path(property);
        Map<FileStatus, String> findAllTopics = findAllTopics(path2, new WhiteBlackListPathFilter(stringList2, stringList, this.fileSystem.getFileStatus(path2).getPath()), this.sourceSubdir, this.fileSystem);
        for (FileStatus fileStatus : findAllTopics.keySet()) {
            String str = findAllTopics.get(fileStatus);
            log.info("Processing topic " + str);
            try {
                runCollectorForTopicDir(this.fileSystem, str, new Path(fileStatus.getPath(), this.sourceSubdir), new Path(property2 + "/" + findAllTopics.get(fileStatus).replace(".", "/") + "/" + this.destSubdir));
            } catch (Exception e) {
                System.err.println("unable to process " + str + " skipping...");
                e.printStackTrace();
            }
        }
        log.info("Shutting down priority executor");
        this.executorService.shutdown();
        while (!this.executorService.isTerminated()) {
            this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
        }
        log.info("Shutting down");
        if (this.errorMessages.isEmpty()) {
            return;
        }
        for (SweeperError sweeperError : this.errorMessages) {
            System.err.println("Error occurred in " + sweeperError.getTopic() + " at " + sweeperError.getInputPath().toString() + " message " + sweeperError.getException().getMessage());
            sweeperError.e.printStackTrace();
        }
        throw new RuntimeException("Sweeper Failed");
    }

    private static String getUmask(Configuration configuration) {
        if (configuration.get("dfs.umaskmode") != null && configuration.get("dfs.umask") != null) {
            log.warn(String.format("Both umask labels exist: %s=%s, %s=%s", "dfs.umaskmode", configuration.get("dfs.umaskmode"), "dfs.umask", configuration.get("dfs.umask")));
            return configuration.get("dfs.umaskmode");
        }
        if (configuration.get("dfs.umaskmode") != null) {
            log.info(String.format("umask set: %s=%s", "dfs.umaskmode", configuration.get("dfs.umaskmode")));
            return configuration.get("dfs.umaskmode");
        }
        if (configuration.get("dfs.umask") != null) {
            log.info(String.format("umask set: %s=%s", "dfs.umask", configuration.get("dfs.umask")));
            return configuration.get("dfs.umask");
        }
        log.info("umask unset");
        return "undefined";
    }

    protected void runCollectorForTopicDir(FileSystem fileSystem, String str, Path path, Path path2) throws Exception {
        log.info("Running collector for topic " + str + " source:" + path + " dest:" + path2);
        ArrayList arrayList = new ArrayList();
        Iterator<Properties> it = this.planner.createSweeperJobProps(str, path, path2, fileSystem).iterator();
        while (it.hasNext()) {
            arrayList.add(runCollector(it.next(), str));
        }
        log.info("Finishing processing for topic " + str);
    }

    protected Future<?> runCollector(Properties properties, String str) {
        String str2 = str + "-" + UUID.randomUUID().toString();
        properties.put(TMP_PATH, properties.getProperty("camus.sweeper.tmp.dir") + "/" + str2 + "_" + System.currentTimeMillis());
        if (properties.containsKey("reduce.count.override." + str)) {
            properties.put(REDUCER_COUNT, Integer.valueOf(Integer.parseInt(properties.getProperty("reduce.count.override." + str))));
        }
        log.info("Processing " + properties.get(INPUT_PATHS));
        return this.executorService.submit(new KafkaCollectorRunner(str2, properties, this.errorMessages, str));
    }

    protected void mkdirs(FileSystem fileSystem, Path path, FsPermission fsPermission, Configuration configuration) throws IOException {
        if (!fileSystem.exists(path.getParent())) {
            mkdirs(fileSystem, path.getParent(), fsPermission, configuration);
        }
        String str = "Creating " + path + " with permissions " + fsPermission + " and umask " + getUmask(configuration);
        if (!fileSystem.exists(path)) {
            log.info(str);
        }
        if (!fileSystem.mkdirs(path, fsPermission)) {
            String str2 = str + " failed";
            log.error(str2);
            throw new IOException(str2);
        }
        if (fileSystem.getFileStatus(path).getPermission().equals(fsPermission)) {
            return;
        }
        log.error(String.format("Wrong permission for %s. Expects %s, actual %s", path, fsPermission, fileSystem.getFileStatus(path).getPermission()));
        fileSystem.setPermission(path, fsPermission);
    }

    public int run(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption("p", true, "properties filename from the classpath");
        options.addOption("P", true, "external properties filename");
        OptionBuilder.withArgName("property=value");
        OptionBuilder.hasArgs(2);
        OptionBuilder.withValueSeparator();
        OptionBuilder.withDescription("use value for given property");
        options.addOption(OptionBuilder.create("D"));
        CommandLine parse = new PosixParser().parse(options, strArr);
        if (!parse.hasOption('p') && !parse.hasOption('P')) {
            new HelpFormatter().printHelp("CamusJob.java", options);
            return 1;
        }
        if (parse.hasOption('p')) {
            this.props.load(getClass().getResourceAsStream(parse.getOptionValue('p')));
        }
        if (parse.hasOption('P')) {
            this.props.load(new FileInputStream(new File(parse.getOptionValue('P'))));
        }
        this.props.putAll(parse.getOptionProperties("D"));
        init();
        run();
        return 0;
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new CamusSweeper(), strArr);
    }
}
