package com.linkedin.camus.sweeper;

import com.linkedin.camus.sweeper.mapreduce.CamusSweeperJob;
import com.linkedin.camus.sweeper.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper.class */
public class CamusSweeper extends Configured implements Tool {
    private static final String DEFAULT_NUM_THREADS = "5";
    private List<SweeperError> errorMessages;
    private List<Job> runningJobs;
    private Properties props;
    private ExecutorService executorService;
    private FsPermission perm;
    private String destSubdir;
    private String sourceSubdir;
    private static Logger log = Logger.getLogger(CamusSweeper.class);
    private CamusSweeperPlanner planner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$BlackListPathFilter.class */
    public class BlackListPathFilter implements PathFilter {
        private Pattern whitelist;
        private Pattern blacklist;

        public BlackListPathFilter(Collection<String> collection, Collection<String> collection2) {
            if (collection.isEmpty()) {
                this.whitelist = Pattern.compile(".*");
            } else {
                this.whitelist = CamusSweeper.this.compileMultiPattern(collection);
            }
            if (collection2.isEmpty()) {
                this.blacklist = Pattern.compile("a^");
            } else {
                this.blacklist = CamusSweeper.this.compileMultiPattern(collection2);
            }
        }

        public boolean accept(Path path) {
            String name = path.getName();
            return (!this.whitelist.matcher(name).matches() || this.blacklist.matcher(name).matches() || name.startsWith(".") || name.startsWith("_")) ? false : true;
        }
    }

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$KafkaCollector.class */
    private class KafkaCollector {
        private static final long AVERAGE_FILE_SIZE = 1610612736;
        private String jobName;
        private Properties props;
        private String topicName;

        public KafkaCollector(String str, Properties properties, String str2, String str3) {
            this.jobName = str2;
            this.props = properties;
            this.topicName = str3;
        }

        private void addInputPath(Job job, Path path, FileSystem fileSystem) throws IOException {
            for (FileStatus fileStatus : fileSystem.listStatus(path)) {
                if (fileStatus.isDir()) {
                    addInputPath(job, fileStatus.getPath(), fileSystem);
                } else if (fileStatus.getPath().getName().endsWith("avro")) {
                    FileInputFormat.addInputPath(job, fileStatus.getPath());
                }
            }
        }

        public void run() throws Exception {
            int min;
            Job job = new Job(CamusSweeper.this.getConf());
            job.setJarByClass(CamusSweeper.class);
            job.setJobName(this.jobName);
            for (Map.Entry entry : this.props.entrySet()) {
                job.getConfiguration().set((String) entry.getKey(), (String) entry.getValue());
            }
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
            List<String> stringList = Utils.getStringList(this.props, "input.paths");
            Path[] pathArr = new Path[stringList.size()];
            for (int i = 0; i < stringList.size(); i++) {
                pathArr[i] = new Path(stringList.get(i));
            }
            for (Path path : pathArr) {
                addInputPath(job, path, fileSystem);
            }
            if (job.getConfiguration().get("reducer.count") != null) {
                min = job.getConfiguration().getInt("reducer.count", 45);
            } else {
                long j = 0;
                for (Path path2 : pathArr) {
                    j += duDirectory(fileSystem, path2);
                }
                min = Math.min(((int) (j / AVERAGE_FILE_SIZE)) + 1, job.getConfiguration().getInt("max.files", 24));
            }
            System.out.println("Setting reducer " + min);
            job.setNumReduceTasks(min);
            job.getConfiguration().set("mapred.compress.map.output", "true");
            Path path3 = new Path(job.getConfiguration().get("tmp.path"));
            Path path4 = new Path(job.getConfiguration().get("dest.path"));
            FileOutputFormat.setOutputPath(job, path3);
            ((CamusSweeperJob) Class.forName(this.props.getProperty("camus.sweeper.io.configurer.class")).newInstance()).setLogger(CamusSweeper.log).configureJob(this.topicName, job);
            job.submit();
            CamusSweeper.this.runningJobs.add(job);
            System.out.println("job running: " + job.getTrackingURL());
            job.waitForCompletion(false);
            if (!job.isSuccessful()) {
                System.err.println("hadoop job failed");
                return;
            }
            Path path5 = null;
            if (fileSystem.exists(path4)) {
                path5 = new Path("/tmp", "_old_" + job.getJobID());
                System.out.println("Path " + path4 + " exists. Overwriting.");
                if (!fileSystem.rename(path4, path5)) {
                    fileSystem.delete(path3, true);
                    throw new RuntimeException("Error: cannot rename " + path4 + " to " + path4);
                }
            }
            System.out.println("Swapping " + path3 + " to " + path4);
            CamusSweeper.this.mkdirs(fileSystem, path4.getParent(), CamusSweeper.this.perm);
            fileSystem.mkdirs(path4.getParent(), CamusSweeper.this.perm);
            if (!fileSystem.rename(path3, path4)) {
                fileSystem.rename(path5, path4);
                fileSystem.delete(path3, true);
                throw new RuntimeException("Error: cannot rename " + path3 + " to " + path4);
            }
            if (path5 == null || !fileSystem.exists(path5)) {
                return;
            }
            System.out.println("Deleting " + path5);
            fileSystem.delete(path5, true);
        }

        private long duDirectory(FileSystem fileSystem, Path path) throws IOException {
            String name = path.getName();
            if (name.startsWith("_") || name.startsWith(".")) {
                return 0L;
            }
            FileStatus fileStatus = fileSystem.getFileStatus(path);
            long j = 0;
            if (!fileStatus.isDir()) {
                return fileStatus.getLen();
            }
            for (FileStatus fileStatus2 : fileSystem.listStatus(path)) {
                j += duDirectory(fileSystem, fileStatus2.getPath());
            }
            return j;
        }
    }

    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$KafkaCollectorRunner.class */
    public class KafkaCollectorRunner implements Runnable {
        private Properties props;
        private String name;
        private List<SweeperError> errorQueue;
        private String topic;

        public KafkaCollectorRunner(String str, Properties properties, List<SweeperError> list, String str2) {
            this.name = str;
            this.props = properties;
            this.errorQueue = list;
            this.topic = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                System.out.println("Starting runner for " + this.name);
                KafkaCollector kafkaCollector = new KafkaCollector("test", this.props, this.name, this.topic);
                System.out.println("Running " + this.name + " for input " + this.props.getProperty("input.paths"));
                kafkaCollector.run();
            } catch (Exception e) {
                System.out.println("Failed for " + this.name + " failed for " + this.props.getProperty("input.paths") + " Exception:" + e.getLocalizedMessage());
                this.errorQueue.add(new SweeperError(this.name, this.props.get("input.paths").toString(), e));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/sweeper/CamusSweeper$SweeperError.class */
    public static class SweeperError {
        private final String topic;
        private final String input;
        private final Exception e;

        public SweeperError(String str, String str2, Exception exc) {
            this.topic = str;
            this.input = str2;
            this.e = exc;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getInputPath() {
            return this.input;
        }

        public Exception getException() {
            return this.e;
        }
    }

    public CamusSweeper() {
        this.perm = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
        this.props = new Properties();
    }

    public CamusSweeper(Properties properties) {
        this.perm = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
        this.props = properties;
        init();
    }

    private void init() {
        this.errorMessages = Collections.synchronizedList(new ArrayList());
        DateTimeZone.setDefault(DateTimeZone.forID(this.props.getProperty("default.timezone")));
        this.runningJobs = Collections.synchronizedList(new ArrayList());
        this.sourceSubdir = this.props.getProperty("camus.sweeper.source.subdir");
        this.destSubdir = this.props.getProperty("camus.sweeper.dest.subdir");
        try {
            this.planner = ((CamusSweeperPlanner) Class.forName(this.props.getProperty("camus.sweeper.planner.class")).newInstance()).setPropertiesLogger(this.props, log);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void cancel() throws Exception {
        this.executorService.shutdownNow();
        for (Job job : this.runningJobs) {
            if (!job.isComplete()) {
                try {
                    job.killJob();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void run() throws Exception {
        System.out.println("Starting kafka sweeper");
        this.executorService = Executors.newFixedThreadPool(Integer.parseInt(this.props.getProperty("num.threads", DEFAULT_NUM_THREADS)));
        String property = this.props.getProperty("camus.sweeper.source.dir");
        String property2 = this.props.getProperty("camus.sweeper.dest.dir", "");
        if (property2.isEmpty()) {
            property2 = property;
        }
        List<String> stringList = Utils.getStringList(this.props, "camus.sweeper.blacklist");
        List<String> stringList2 = Utils.getStringList(this.props, "camus.sweeper.whitelist");
        Configuration configuration = new Configuration();
        for (Map.Entry entry : this.props.entrySet()) {
            configuration.set((String) entry.getKey(), (String) entry.getValue());
        }
        FileSystem fileSystem = FileSystem.get(configuration);
        for (FileStatus fileStatus : fileSystem.listStatus(new Path(property), new BlackListPathFilter(stringList2, stringList))) {
            String name = fileStatus.getPath().getName();
            System.out.println("Processing topic " + name);
            try {
                runCollectorForTopicDir(fileSystem, name, new Path(fileStatus.getPath(), this.sourceSubdir), new Path(property2 + "/" + fileStatus.getPath().getName() + "/" + this.destSubdir));
            } catch (Exception e) {
                System.err.println("unable to process " + name + " skipping...");
                e.printStackTrace();
            }
        }
        System.out.println("Shutting down executor");
        this.executorService.shutdown();
        while (!this.executorService.isTerminated()) {
            this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
        }
        System.out.println("Shutting down");
        if (this.errorMessages.isEmpty()) {
            return;
        }
        for (SweeperError sweeperError : this.errorMessages) {
            System.err.println("Error occurred in " + sweeperError.getTopic() + " at " + sweeperError.getInputPath().toString() + " message " + sweeperError.getException().getMessage());
            sweeperError.e.printStackTrace();
        }
        throw new RuntimeException("Sweeper Failed");
    }

    private void runCollectorForTopicDir(FileSystem fileSystem, String str, Path path, Path path2) throws Exception {
        System.out.println("Running collector for topic " + str + " source:" + path + " dest:" + path2);
        ArrayList arrayList = new ArrayList();
        Iterator<Properties> it = this.planner.createSweeperJobProps(str, path, path2, fileSystem).iterator();
        while (it.hasNext()) {
            arrayList.add(runCollector(it.next(), str));
        }
        System.out.println("Finishing processing for topic " + str);
    }

    private Future runCollector(Properties properties, String str) {
        String str2 = str + "-" + UUID.randomUUID().toString();
        properties.put("tmp.path", "/tmp/" + str2 + "_" + System.currentTimeMillis());
        if (properties.containsKey("reduce.count.override." + str)) {
            properties.put("reducer.count", Integer.valueOf(Integer.parseInt(properties.getProperty("reduce.count.override." + str))));
        }
        System.out.println("Processing " + properties.get("input.paths"));
        return this.executorService.submit(new KafkaCollectorRunner(str2, properties, this.errorMessages, str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mkdirs(FileSystem fileSystem, Path path, FsPermission fsPermission) throws IOException {
        System.err.println(path.toString());
        if (!fileSystem.exists(path.getParent())) {
            mkdirs(fileSystem, path.getParent(), fsPermission);
        }
        fileSystem.mkdirs(path, fsPermission);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Pattern compileMultiPattern(Collection<String> collection) {
        String str = "(";
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            str = str + it.next() + "|";
        }
        return Pattern.compile(str.substring(0, str.length() - 1) + ")");
    }

    public int run(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption("p", true, "properties filename from the classpath");
        options.addOption("P", true, "external properties filename");
        OptionBuilder.withArgName("property=value");
        OptionBuilder.hasArgs(2);
        OptionBuilder.withValueSeparator();
        OptionBuilder.withDescription("use value for given property");
        options.addOption(OptionBuilder.create("D"));
        CommandLine parse = new PosixParser().parse(options, strArr);
        if (!parse.hasOption('p') && !parse.hasOption('P')) {
            new HelpFormatter().printHelp("CamusJob.java", options);
            return 1;
        }
        if (parse.hasOption('p')) {
            this.props.load(getClass().getResourceAsStream(parse.getOptionValue('p')));
        }
        if (parse.hasOption('P')) {
            this.props.load(new FileInputStream(new File(parse.getOptionValue('P'))));
        }
        this.props.putAll(parse.getOptionProperties("D"));
        init();
        run();
        return 0;
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new CamusSweeper(), strArr);
    }
}
