package com.linkedin.camus.etl.kafka;

import com.google.gson.Gson;
import com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder;
import com.linkedin.camus.etl.kafka.common.EtlCountsForUnitTest;
import com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormatForUnitTest;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlRecordReaderForUnitTest;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.cluster.Broker;
import kafka.javaapi.FetchRequest;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJobTestWithMock.class */
public class CamusJobTestWithMock {
    private static final String BASE_PATH = "/camus";
    private static final String DESTINATION_PATH = "/camus/destination";
    private static final String EXECUTION_BASE_PATH = "/camus/execution";
    private static final String EXECUTION_HISTORY_PATH = "/camus/execution/history";
    private static final String KAFKA_HOST = "localhost";
    private static final int KAFKA_PORT = 2121;
    private static final int KAFKA_TIMEOUT_VALUE = 1000;
    private static final int KAFKA_BUFFER_SIZE = 1024;
    private static final String KAFKA_CLIENT_ID = "Camus";
    private static final String TOPIC_1 = "topic_1";
    private static final int PARTITION_1_ID = 0;
    private static FileSystem fs;
    private static Gson gson;
    private static Map<String, List<MyMessage>> messagesWritten;
    private Properties props;
    private CamusJob job;
    private TemporaryFolder folder;
    private String destinationPath;
    private static final Random RANDOM = new Random();
    private static List<Object> mocks = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/CamusJobTestWithMock$MyMessage.class */
    public static class MyMessage {
        private int number;

        public MyMessage() {
        }

        public MyMessage(int i) {
            this.number = i;
        }

        public boolean equals(Object obj) {
            return obj != null && (obj instanceof MyMessage) && this.number == ((MyMessage) obj).number;
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        fs = FileSystem.get(new Configuration());
        gson = new Gson();
        messagesWritten = new HashMap();
        messagesWritten.put(TOPIC_1, writeKafka(TOPIC_1, 10));
    }

    @AfterClass
    public static void afterClass() {
    }

    @Before
    public void before() throws IOException, NoSuchFieldException, IllegalAccessException {
        this.folder = new TemporaryFolder();
        this.folder.create();
        String absolutePath = this.folder.getRoot().getAbsolutePath();
        this.destinationPath = absolutePath + DESTINATION_PATH;
        this.props = new Properties();
        this.props.setProperty("etl.destination.path", this.destinationPath);
        this.props.setProperty("etl.execution.base.path", absolutePath + EXECUTION_BASE_PATH);
        this.props.setProperty("etl.execution.history.path", absolutePath + EXECUTION_HISTORY_PATH);
        this.props.setProperty("camus.message.decoder.class", JsonStringMessageDecoder.class.getName());
        this.props.setProperty("etl.record.writer.provider.class", SequenceFileRecordWriterProvider.class.getName());
        this.props.setProperty("etl.run.tracking.post", Boolean.toString(false));
        this.props.setProperty("kafka.client.name", KAFKA_CLIENT_ID);
        this.props.setProperty("kafka.timeout.value", Integer.toString(KAFKA_TIMEOUT_VALUE));
        this.props.setProperty("kafka.fetch.buffer.size", Integer.toString(KAFKA_BUFFER_SIZE));
        this.props.setProperty("kafka.brokers", "localhost:2121");
        this.props.setProperty("mapreduce.framework.name", "local");
        this.props.setProperty("mapreduce.jobtracker.address", "local");
        this.job = new CamusJob(this.props);
    }

    @After
    public void after() throws IOException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
        this.folder.delete();
        mocks.clear();
        EtlInputFormatForUnitTest.reset();
        EtlRecordReaderForUnitTest.reset();
        EtlCountsForUnitTest.reset();
        Field declaredField = EtlMultiOutputFormat.class.getDeclaredField("committer");
        declaredField.setAccessible(true);
        declaredField.set(null, null);
    }

    @Test
    public void testRegularJob() throws Exception {
        setupRegularJob();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
        verifyJobSucceed();
    }

    private void setupRegularJob() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        OffsetResponse mockOffsetResponse = mockOffsetResponse(list);
        FetchResponse mockFetchResponse = mockFetchResponse(list);
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponse, mockFetchResponse);
        EasyMock.replay(mocks.toArray());
    }

    private TopicMetadataResponse mockTopicMetaDataResponse() {
        PartitionMetadata partitionMetadata = (PartitionMetadata) EasyMock.createMock(PartitionMetadata.class);
        mocks.add(partitionMetadata);
        EasyMock.expect(Short.valueOf(partitionMetadata.errorCode())).andReturn((short) 0).anyTimes();
        EasyMock.expect(partitionMetadata.leader()).andReturn(new Broker(PARTITION_1_ID, KAFKA_HOST, KAFKA_PORT)).anyTimes();
        EasyMock.expect(Integer.valueOf(partitionMetadata.partitionId())).andReturn(Integer.valueOf(PARTITION_1_ID)).anyTimes();
        ArrayList arrayList = new ArrayList();
        arrayList.add(partitionMetadata);
        TopicMetadata topicMetadata = (TopicMetadata) EasyMock.createMock(TopicMetadata.class);
        mocks.add(topicMetadata);
        EasyMock.expect(topicMetadata.topic()).andReturn(TOPIC_1).anyTimes();
        EasyMock.expect(Short.valueOf(topicMetadata.errorCode())).andReturn((short) 0).anyTimes();
        EasyMock.expect(topicMetadata.partitionsMetadata()).andReturn(arrayList).anyTimes();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(topicMetadata);
        TopicMetadataResponse topicMetadataResponse = (TopicMetadataResponse) EasyMock.createMock(TopicMetadataResponse.class);
        mocks.add(topicMetadataResponse);
        EasyMock.expect(topicMetadataResponse.topicsMetadata()).andReturn(arrayList2).anyTimes();
        return topicMetadataResponse;
    }

    private OffsetResponse mockOffsetResponse(List<MyMessage> list) {
        OffsetResponse offsetResponse = (OffsetResponse) EasyMock.createMock(OffsetResponse.class);
        mocks.add(offsetResponse);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{list.size()}).times(1);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{0}).times(1);
        EasyMock.expect(Boolean.valueOf(offsetResponse.hasError())).andReturn(false).times(2);
        return offsetResponse;
    }

    private FetchResponse mockFetchResponse(List<MyMessage> list) {
        FetchResponse fetchResponse = (FetchResponse) EasyMock.createMock(FetchResponse.class);
        EasyMock.expect(Boolean.valueOf(fetchResponse.hasError())).andReturn(false).times(1);
        ArrayList arrayList = new ArrayList();
        Iterator<MyMessage> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new Message(gson.toJson(it.next()).getBytes(), Integer.toString(PARTITION_1_ID).getBytes()));
        }
        EasyMock.expect(fetchResponse.messageSet(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new ByteBufferMessageSet(arrayList)).times(1);
        mocks.add(fetchResponse);
        return fetchResponse;
    }

    private SimpleConsumer mockSimpleConsumer(TopicMetadataResponse topicMetadataResponse, OffsetResponse offsetResponse, FetchResponse fetchResponse) {
        SimpleConsumer simpleConsumer = (SimpleConsumer) EasyMock.createMock(SimpleConsumer.class);
        mocks.add(simpleConsumer);
        EasyMock.expect(simpleConsumer.send((TopicMetadataRequest) EasyMock.anyObject())).andReturn(topicMetadataResponse).times(1);
        EasyMock.expect(simpleConsumer.getOffsetsBefore((OffsetRequest) EasyMock.anyObject())).andReturn(offsetResponse).anyTimes();
        simpleConsumer.close();
        EasyMock.expectLastCall().andVoid().anyTimes();
        EasyMock.expect(simpleConsumer.clientId()).andReturn(KAFKA_CLIENT_ID).times(1);
        EasyMock.expect(simpleConsumer.fetch((FetchRequest) EasyMock.anyObject())).andReturn(fetchResponse).times(1);
        EasyMock.expect(simpleConsumer.host()).andReturn("dummyHost").anyTimes();
        EasyMock.expect(Integer.valueOf(simpleConsumer.port())).andReturn(8888).anyTimes();
        return simpleConsumer;
    }

    private void verifyJobSucceed() throws Exception {
        EasyMock.verify(mocks.toArray());
        assertCamusContains(TOPIC_1);
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailDueToOffsetRangeCallException() throws Exception {
        setupJobFailDueToOffsetRangeCallException();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    private void setupJobFailDueToOffsetRangeCallException() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockConsumerThrowsExceptionForOffsetRangeCall(mockTopicMetaDataResponse);
        EasyMock.replay(mocks.toArray());
    }

    private SimpleConsumer mockConsumerThrowsExceptionForOffsetRangeCall(TopicMetadataResponse topicMetadataResponse) {
        SimpleConsumer simpleConsumer = (SimpleConsumer) EasyMock.createMock(SimpleConsumer.class);
        mocks.add(simpleConsumer);
        EasyMock.expect(simpleConsumer.send((TopicMetadataRequest) EasyMock.anyObject())).andReturn(topicMetadataResponse).times(1);
        EasyMock.expect(simpleConsumer.getOffsetsBefore((OffsetRequest) EasyMock.anyObject())).andThrow(new RuntimeException()).times(3);
        EasyMock.expect(simpleConsumer.clientId()).andReturn(KAFKA_CLIENT_ID).times(1);
        simpleConsumer.close();
        EasyMock.expectLastCall().andVoid().times(2);
        EasyMock.expect(simpleConsumer.host()).andReturn("dummyHost").times(4);
        EasyMock.expect(Integer.valueOf(simpleConsumer.port())).andReturn(8888).times(4);
        return simpleConsumer;
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailDueToOffsetRangeCallError() throws Exception {
        setupJobFailDueToOffsetRangeCallError();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    private void setupJobFailDueToOffsetRangeCallError() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        OffsetResponse mockOffsetResponseWithError = mockOffsetResponseWithError(messagesWritten.get(TOPIC_1));
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponseWithError, null);
        EasyMock.replay(mocks.toArray());
    }

    private OffsetResponse mockOffsetResponseWithError(List<MyMessage> list) {
        OffsetResponse offsetResponse = (OffsetResponse) EasyMock.createMock(OffsetResponse.class);
        mocks.add(offsetResponse);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{list.size()}).times(1);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{0}).times(1);
        EasyMock.expect(Boolean.valueOf(offsetResponse.hasError())).andReturn(true).times(3);
        return offsetResponse;
    }

    @Test
    public void testJobOffsetRangeCallThirdTrySucceed() throws Exception {
        setupJobOffsetRangeCallThirdTrySucceed();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
        verifyJobSucceed();
    }

    private void setupJobOffsetRangeCallThirdTrySucceed() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        OffsetResponse mockOffsetResponseThirdTrySucceed = mockOffsetResponseThirdTrySucceed(list);
        FetchResponse mockFetchResponse = mockFetchResponse(list);
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponseThirdTrySucceed, mockFetchResponse);
        EasyMock.replay(mocks.toArray());
    }

    private OffsetResponse mockOffsetResponseThirdTrySucceed(List<MyMessage> list) {
        OffsetResponse offsetResponse = (OffsetResponse) EasyMock.createMock(OffsetResponse.class);
        mocks.add(offsetResponse);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{list.size()}).times(1);
        EasyMock.expect(offsetResponse.offsets(EasyMock.anyString(), EasyMock.anyInt())).andReturn(new long[]{0}).times(1);
        EasyMock.expect(Boolean.valueOf(offsetResponse.hasError())).andReturn(true).times(2);
        EasyMock.expect(Boolean.valueOf(offsetResponse.hasError())).andReturn(false).times(2);
        return offsetResponse;
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailTooManySkippedMsgSchemaNotFound() throws Exception {
        this.props.setProperty("etl.max.percent.skipped.schemanotfound", "10.0");
        setupJobWithSkippedMsgSchemaNotFound();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    @Test
    public void testJobSucceedWithSkippedMsgSchemaNotFound() throws Exception {
        this.props.setProperty("etl.max.percent.skipped.schemanotfound", "40.0");
        setupJobWithSkippedMsgSchemaNotFound();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
        EasyMock.verify(mocks.toArray());
    }

    private void setupJobWithSkippedMsgSchemaNotFound() {
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.recordReaderClass = EtlInputFormatForUnitTest.RecordReaderClass.TEST;
        EtlRecordReaderForUnitTest.decoderType = EtlRecordReaderForUnitTest.DecoderType.SCHEMA_NOT_FOUND_30_PERCENT;
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponse(list), mockFetchResponse(list));
        EasyMock.replay(mocks.toArray());
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailTooManySkippedMsgOther() throws Exception {
        this.props.setProperty("etl.max.percent.skipped.other", "10.0");
        setupJobWithSkippedMsgOther();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    @Test
    public void testJobSucceedWithSkippedMsgOther() throws Exception {
        this.props.setProperty("etl.max.percent.skipped.other", "40.0");
        setupJobWithSkippedMsgOther();
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
        EasyMock.verify(mocks.toArray());
    }

    private void setupJobWithSkippedMsgOther() {
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.recordReaderClass = EtlInputFormatForUnitTest.RecordReaderClass.TEST;
        EtlRecordReaderForUnitTest.decoderType = EtlRecordReaderForUnitTest.DecoderType.OTHER_30_PERCENT;
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponse(list), mockFetchResponse(list));
        EasyMock.replay(mocks.toArray());
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailOffsetTooEarly() throws Exception {
        EtlInputFormatForUnitTest.camusRequestType = EtlInputFormatForUnitTest.CamusRequestType.MOCK_OFFSET_TOO_EARLY;
        setupRegularJob();
        this.props.setProperty("kafka.move.to.earliest.offset", Boolean.FALSE.toString());
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailOffsetTooLate() throws Exception {
        EtlInputFormatForUnitTest.camusRequestType = EtlInputFormatForUnitTest.CamusRequestType.MOCK_OFFSET_TOO_LATE;
        setupRegularJob();
        this.props.setProperty("kafka.move.to.earliest.offset", Boolean.FALSE.toString());
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    @Test(expected = RuntimeException.class)
    public void testJobFailDueToPublishCountsException() throws Exception {
        setupJobFailDueToPublishCountsException();
        this.props.setProperty("etl.run.tracking.post", Boolean.TRUE.toString());
        this.props.setProperty("camus.message.encoder.class", "com.linkedin.camus.etl.kafka.coders.EncoderForUnitTest");
        this.props.setProperty("etl.counts.class", "com.linkedin.camus.etl.kafka.common.EtlCountsForUnitTest");
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
    }

    private void setupJobFailDueToPublishCountsException() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        OffsetResponse mockOffsetResponse = mockOffsetResponse(list);
        FetchResponse mockFetchResponse = mockFetchResponse(list);
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponse, mockFetchResponse);
        EtlCountsForUnitTest.producerType = EtlCountsForUnitTest.ProducerType.SEND_THROWS_EXCEPTION;
        EasyMock.replay(mocks.toArray());
    }

    @Test
    public void testJobPublishCountsThirdTimeSucceed() throws Exception {
        setupJobPublishCountsThirdTimeSucceed();
        this.props.setProperty("etl.run.tracking.post", Boolean.TRUE.toString());
        this.props.setProperty("camus.message.encoder.class", "com.linkedin.camus.etl.kafka.coders.EncoderForUnitTest");
        this.props.setProperty("etl.counts.class", "com.linkedin.camus.etl.kafka.common.EtlCountsForUnitTest");
        this.job = new CamusJob(this.props);
        this.job.run(EtlInputFormatForUnitTest.class, EtlMultiOutputFormat.class);
        verifyJobSucceed();
    }

    private void setupJobPublishCountsThirdTimeSucceed() {
        TopicMetadataResponse mockTopicMetaDataResponse = mockTopicMetaDataResponse();
        List<MyMessage> list = messagesWritten.get(TOPIC_1);
        OffsetResponse mockOffsetResponse = mockOffsetResponse(list);
        FetchResponse mockFetchResponse = mockFetchResponse(list);
        EtlInputFormatForUnitTest.consumerType = EtlInputFormatForUnitTest.ConsumerType.MOCK;
        EtlInputFormatForUnitTest.consumer = mockSimpleConsumer(mockTopicMetaDataResponse, mockOffsetResponse, mockFetchResponse);
        EtlCountsForUnitTest.producerType = EtlCountsForUnitTest.ProducerType.SEND_SUCCEED_THIRD_TIME;
        EasyMock.replay(mocks.toArray());
    }

    private void assertCamusContains(String str) throws InstantiationException, IllegalAccessException, IOException {
        assertCamusContains(str, messagesWritten.get(str));
    }

    private void assertCamusContains(String str, List<MyMessage> list) throws InstantiationException, IllegalAccessException, IOException {
        Assert.assertThat(Integer.valueOf(readMessages(str).size()), Is.is(Integer.valueOf(list.size())));
        Assert.assertTrue(readMessages(str).containsAll(list));
    }

    private static List<MyMessage> writeKafka(String str, int i) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = PARTITION_1_ID; i2 < i; i2++) {
            MyMessage myMessage = new MyMessage(RANDOM.nextInt());
            arrayList.add(myMessage);
            arrayList2.add(new KeyedMessage(str, Integer.toString(i2), gson.toJson(myMessage)));
        }
        return arrayList;
    }

    private List<MyMessage> readMessages(String str) throws IOException, InstantiationException, IllegalAccessException {
        return readMessages(new Path(this.destinationPath, str));
    }

    private List<MyMessage> readMessages(Path path) throws IOException, InstantiationException, IllegalAccessException {
        ArrayList arrayList = new ArrayList();
        try {
            FileStatus[] listStatus = fs.listStatus(path);
            int length = listStatus.length;
            for (int i = PARTITION_1_ID; i < length; i++) {
                FileStatus fileStatus = listStatus[i];
                if (fileStatus.isDir()) {
                    arrayList.addAll(readMessages(fileStatus.getPath()));
                } else {
                    SequenceFile.Reader reader = new SequenceFile.Reader(fs, fileStatus.getPath(), new Configuration());
                    try {
                        LongWritable longWritable = (LongWritable) reader.getKeyClass().newInstance();
                        Text text = (Text) reader.getValueClass().newInstance();
                        while (reader.next(longWritable, text)) {
                            arrayList.add(gson.fromJson(text.toString(), MyMessage.class));
                        }
                        reader.close();
                    } finally {
                    }
                }
            }
        } catch (FileNotFoundException e) {
            System.out.println("No camus messages were found in [" + path + "]");
        }
        return arrayList;
    }

    private static TopicMetadataResponse createTopicMetadataResponseFromBytes(TopicMetadataRequest topicMetadataRequest) {
        ByteBuffer allocate = ByteBuffer.allocate(KAFKA_BUFFER_SIZE);
        allocate.putInt(PARTITION_1_ID);
        allocate.putInt(1);
        allocate.putInt(PARTITION_1_ID);
        allocate.putShort((short) KAFKA_HOST.length());
        try {
            allocate.put(KAFKA_HOST.getBytes("UTF-8"));
            allocate.putInt(2012);
            allocate.putInt(1);
            allocate.putShort((short) 0);
            allocate.putShort((short) TOPIC_1.length());
            try {
                allocate.put(TOPIC_1.getBytes("UTF-8"));
                allocate.putInt(1);
                allocate.putShort((short) 0);
                allocate.putInt(PARTITION_1_ID);
                allocate.putInt(PARTITION_1_ID);
                allocate.putInt(PARTITION_1_ID);
                allocate.putInt(PARTITION_1_ID);
                allocate.rewind();
                return new TopicMetadataResponse(kafka.api.TopicMetadataResponse.readFrom(allocate));
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        } catch (UnsupportedEncodingException e2) {
            throw new RuntimeException(e2);
        }
    }
}
