package com.linkedin.camus.etl.kafka;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/KafkaCluster.class */
public class KafkaCluster {
    private static final Random RANDOM = new Random();
    private static final String TEMP_DIR_PREFIX = "camus-";
    private final EmbeddedZookeeper zookeeper;
    private final List<KafkaServer> brokers;
    private final Properties props;

    /* loaded from: input_file:com/linkedin/camus/etl/kafka/KafkaCluster$EmbeddedZookeeper.class */
    private static class EmbeddedZookeeper {
        private final int port = KafkaCluster.access$000();
        private final File snapshotDir = KafkaCluster.access$100();
        private final File logDir = KafkaCluster.access$100();
        private final NIOServerCnxn.Factory factory = new NIOServerCnxn.Factory(new InetSocketAddress("localhost", this.port), 1024);

        public EmbeddedZookeeper() throws IOException {
            try {
                this.factory.startup(new ZooKeeperServer(this.snapshotDir, this.logDir, 500));
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }

        public void shutdown() {
            this.factory.shutdown();
            Utils.rm(this.snapshotDir);
            Utils.rm(this.logDir);
        }

        public String getConnection() {
            return "localhost:" + this.port;
        }
    }

    /* loaded from: input_file:com/linkedin/camus/etl/kafka/KafkaCluster$SystemTime.class */
    public static class SystemTime implements Time {
        public long milliseconds() {
            return System.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long j) {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
            }
        }
    }

    public KafkaCluster() throws IOException {
        this(new Properties());
    }

    public KafkaCluster(Properties properties) throws IOException {
        this(properties, 1);
    }

    public KafkaCluster(Properties properties, int i) throws IOException {
        this.zookeeper = new EmbeddedZookeeper();
        this.brokers = new ArrayList();
        this.props = new Properties();
        this.props.putAll(properties);
        StringBuilder sb = null;
        for (int i2 = 0; i2 < i; i2++) {
            if (sb != null) {
                sb.append(",");
            } else {
                sb = new StringBuilder();
            }
            int availablePort = getAvailablePort();
            sb.append("localhost:");
            sb.append(availablePort);
            Properties properties2 = new Properties();
            properties2.putAll(properties);
            properties2.setProperty("zookeeper.connect", this.zookeeper.getConnection());
            properties2.setProperty("broker.id", String.valueOf(i2 + 1));
            properties2.setProperty("host.name", "localhost");
            properties2.setProperty("port", Integer.toString(availablePort));
            properties2.setProperty("log.dir", getTempDir().getAbsolutePath());
            properties2.setProperty("log.flush.interval.messages", String.valueOf(1));
            this.brokers.add(startBroker(properties2));
        }
        this.props.put("metadata.broker.list", sb.toString());
        this.props.put("zookeeper.connect", this.zookeeper.getConnection());
    }

    public Properties getProps() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        return properties;
    }

    public void shutdown() {
        Iterator<KafkaServer> it = this.brokers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.zookeeper.shutdown();
    }

    private static KafkaServer startBroker(Properties properties) {
        KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(properties), new SystemTime());
        kafkaServer.startup();
        return kafkaServer;
    }

    private static File getTempDir() {
        File file = new File(System.getProperty("java.io.tmpdir"), TEMP_DIR_PREFIX + RANDOM.nextInt(10000000));
        if (!file.mkdirs()) {
            throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
        }
        file.deleteOnExit();
        return file;
    }

    private static int getAvailablePort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            int localPort = serverSocket.getLocalPort();
            serverSocket.close();
            return localPort;
        } catch (Throwable th) {
            serverSocket.close();
            throw th;
        }
    }

    static /* synthetic */ int access$000() throws IOException {
        return getAvailablePort();
    }

    static /* synthetic */ File access$100() {
        return getTempDir();
    }
}
