package com.linkedin.camus.etl.kafka;

import com.google.gson.Gson;
import com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder;
import com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJobTest.class */
public class CamusJobTest {
    private static final Random RANDOM = new Random();
    private static final String BASE_PATH = "/camus";
    private static final String DESTINATION_PATH = "/camus/destination";
    private static final String EXECUTION_BASE_PATH = "/camus/execution";
    private static final String EXECUTION_HISTORY_PATH = "/camus/execution/history";
    private static final String TOPIC_1 = "topic_1";
    private static final String TOPIC_2 = "topic_2";
    private static final String TOPIC_3 = "topic_3";
    private static KafkaCluster cluster;
    private static FileSystem fs;
    private static Gson gson;
    private static Map<String, List<Message>> messagesWritten;
    private Properties props;
    private CamusJob job;
    private TemporaryFolder folder;
    private String destinationPath;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJobTest$Message.class */
    public static class Message {
        private int number;

        public Message() {
        }

        public Message(int i) {
            this.number = i;
        }

        public boolean equals(Object obj) {
            return obj != null && (obj instanceof Message) && this.number == ((Message) obj).number;
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        cluster = new KafkaCluster();
        fs = FileSystem.get(new Configuration());
        gson = new Gson();
        messagesWritten = new HashMap();
        messagesWritten.put(TOPIC_1, writeKafka(TOPIC_1, 10));
        messagesWritten.put(TOPIC_2, writeKafka(TOPIC_2, 10));
        messagesWritten.put(TOPIC_3, writeKafka(TOPIC_3, 10));
    }

    @AfterClass
    public static void afterClass() {
        cluster.shutdown();
    }

    @Before
    public void before() throws IOException {
        this.folder = new TemporaryFolder();
        this.folder.create();
        String absolutePath = this.folder.getRoot().getAbsolutePath();
        this.destinationPath = absolutePath + DESTINATION_PATH;
        this.props = cluster.getProps();
        this.props.setProperty("etl.destination.path", this.destinationPath);
        this.props.setProperty("etl.execution.base.path", absolutePath + EXECUTION_BASE_PATH);
        this.props.setProperty("etl.execution.history.path", absolutePath + EXECUTION_HISTORY_PATH);
        this.props.setProperty("camus.message.decoder.class", JsonStringMessageDecoder.class.getName());
        this.props.setProperty("etl.record.writer.provider.class", SequenceFileRecordWriterProvider.class.getName());
        this.props.setProperty("etl.run.tracking.post", Boolean.toString(false));
        this.props.setProperty("kafka.client.name", "Camus");
        this.props.setProperty("kafka.brokers", this.props.getProperty("metadata.broker.list"));
        this.props.setProperty("mapreduce.jobtracker.address", "local");
        this.job = new CamusJob(this.props);
    }

    @After
    public void after() throws IOException {
        this.folder.delete();
    }

    @Test
    public void runJob() throws Exception {
        this.job.run();
        assertCamusContains(TOPIC_1);
        assertCamusContains(TOPIC_2);
        assertCamusContains(TOPIC_3);
        this.job = new CamusJob(this.props);
        this.job.run();
        assertCamusContains(TOPIC_1);
        assertCamusContains(TOPIC_2);
        assertCamusContains(TOPIC_3);
    }

    private void assertCamusContains(String str) throws InstantiationException, IllegalAccessException, IOException {
        assertCamusContains(str, messagesWritten.get(str));
    }

    private void assertCamusContains(String str, List<Message> list) throws InstantiationException, IllegalAccessException, IOException {
        Assert.assertThat(Integer.valueOf(readMessages(str).size()), Is.is(Integer.valueOf(list.size())));
        Assert.assertTrue(readMessages(str).containsAll(list));
    }

    private static List<Message> writeKafka(String str, int i) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            Message message = new Message(RANDOM.nextInt());
            arrayList.add(message);
            arrayList2.add(new KeyedMessage(str, Integer.toString(i2), gson.toJson(message)));
        }
        Properties props = cluster.getProps();
        props.setProperty("serializer.class", StringEncoder.class.getName());
        props.setProperty("key.serializer.class", StringEncoder.class.getName());
        Producer producer = new Producer(new ProducerConfig(props));
        try {
            producer.send(arrayList2);
            producer.close();
            return arrayList;
        } catch (Throwable th) {
            producer.close();
            throw th;
        }
    }

    private List<Message> readMessages(String str) throws IOException, InstantiationException, IllegalAccessException {
        return readMessages(new Path(this.destinationPath, str));
    }

    private List<Message> readMessages(Path path) throws IOException, InstantiationException, IllegalAccessException {
        ArrayList arrayList = new ArrayList();
        for (FileStatus fileStatus : fs.listStatus(path)) {
            if (fileStatus.isDir()) {
                arrayList.addAll(readMessages(fileStatus.getPath()));
            } else {
                SequenceFile.Reader reader = new SequenceFile.Reader(fs, fileStatus.getPath(), new Configuration());
                try {
                    LongWritable longWritable = (LongWritable) reader.getKeyClass().newInstance();
                    Text text = (Text) reader.getValueClass().newInstance();
                    while (reader.next(longWritable, text)) {
                        arrayList.add(gson.fromJson(text.toString(), Message.class));
                    }
                } finally {
                    reader.close();
                }
            }
        }
        return arrayList;
    }
}
