package com.linkedin.camus.etl.kafka.mapred;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.LeaderInfo;
import java.io.IOException;
import java.net.URI;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.class */
public class EtlInputFormat extends InputFormat<EtlKey, CamusWrapper> {
    public static final String KAFKA_BLACKLIST_TOPIC = "kafka.blacklist.topics";
    public static final String KAFKA_WHITELIST_TOPIC = "kafka.whitelist.topics";
    public static final String KAFKA_MOVE_TO_LAST_OFFSET_LIST = "kafka.move.to.last.offset.list";
    public static final String KAFKA_CLIENT_BUFFER_SIZE = "kafka.client.buffer.size";
    public static final String KAFKA_CLIENT_SO_TIMEOUT = "kafka.client.so.timeout";
    public static final String KAFKA_MAX_PULL_HRS = "kafka.max.pull.hrs";
    public static final String KAFKA_MAX_PULL_MINUTES_PER_TASK = "kafka.max.pull.minutes.per.task";
    public static final String KAFKA_MAX_HISTORICAL_DAYS = "kafka.max.historical.days";
    public static final String CAMUS_MESSAGE_DECODER_CLASS = "camus.message.decoder.class";
    public static final String ETL_IGNORE_SCHEMA_ERRORS = "etl.ignore.schema.errors";
    public static final String ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST = "etl.audit.ignore.service.topic.list";
    private static Logger log = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlInputFormat$OffsetFileFilter.class */
    public class OffsetFileFilter implements PathFilter {
        private OffsetFileFilter() {
        }

        public boolean accept(Path path) {
            return path.getName().startsWith(EtlMultiOutputFormat.OFFSET_PREFIX);
        }
    }

    public EtlInputFormat() {
        if (log == null) {
            log = Logger.getLogger(getClass());
        }
    }

    public static void setLogger(Logger logger) {
        log = logger;
    }

    public RecordReader<EtlKey, CamusWrapper> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new EtlRecordReader(inputSplit, taskAttemptContext);
    }

    public List<TopicMetadata> getKafkaMetadata(JobContext jobContext) {
        ArrayList arrayList = new ArrayList();
        CamusJob.startTiming("kafkaSetupTime");
        String kafkaBrokers = CamusJob.getKafkaBrokers(jobContext);
        if (kafkaBrokers.isEmpty()) {
            throw new InvalidParameterException("kafka.brokers must contain at least one node");
        }
        List asList = Arrays.asList(kafkaBrokers.split("\\s*,\\s*"));
        Collections.shuffle(asList);
        boolean z = false;
        int i = 0;
        List<TopicMetadata> list = null;
        Exception exc = null;
        while (i < asList.size() && !z) {
            SimpleConsumer createConsumer = createConsumer(jobContext, (String) asList.get(i));
            log.info(String.format("Fetching metadata from broker %s with client id %s for %d topic(s) %s", asList.get(i), createConsumer.clientId(), Integer.valueOf(arrayList.size()), arrayList));
            try {
                try {
                    list = createConsumer.send(new TopicMetadataRequest(arrayList)).topicsMetadata();
                    z = true;
                    createConsumer.close();
                    i++;
                } catch (Exception e) {
                    exc = e;
                    log.warn(String.format("Fetching topic metadata with client id %s for topics [%s] from broker [%s] failed", createConsumer.clientId(), arrayList, asList.get(i)), e);
                    createConsumer.close();
                    i++;
                }
            } catch (Throwable th) {
                createConsumer.close();
                int i2 = i + 1;
                throw th;
            }
        }
        if (!z) {
            throw new RuntimeException("Failed to obtain metadata!", exc);
        }
        CamusJob.stopTiming("kafkaSetupTime");
        return list;
    }

    private SimpleConsumer createConsumer(JobContext jobContext, String str) {
        if (!str.matches(".+:\\d+")) {
            throw new InvalidParameterException("The kakfa broker " + str + " must follow address:port pattern");
        }
        String[] split = str.split(":");
        return new SimpleConsumer(split[0], Integer.valueOf(split[1]).intValue(), CamusJob.getKafkaTimeoutValue(jobContext), CamusJob.getKafkaBufferSize(jobContext), CamusJob.getKafkaClientName(jobContext));
    }

    public ArrayList<EtlRequest> fetchLatestOffsetAndCreateEtlRequests(JobContext jobContext, HashMap<LeaderInfo, ArrayList<TopicAndPartition>> hashMap) {
        ArrayList<EtlRequest> arrayList = new ArrayList<>();
        for (LeaderInfo leaderInfo : hashMap.keySet()) {
            SimpleConsumer simpleConsumer = new SimpleConsumer(leaderInfo.getUri().getHost(), leaderInfo.getUri().getPort(), CamusJob.getKafkaTimeoutValue(jobContext), CamusJob.getKafkaBufferSize(jobContext), CamusJob.getKafkaClientName(jobContext));
            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
            PartitionOffsetRequestInfo partitionOffsetRequestInfo2 = new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1);
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            ArrayList<TopicAndPartition> arrayList2 = hashMap.get(leaderInfo);
            Iterator<TopicAndPartition> it = arrayList2.iterator();
            while (it.hasNext()) {
                TopicAndPartition next = it.next();
                hashMap2.put(next, partitionOffsetRequestInfo);
                hashMap3.put(next, partitionOffsetRequestInfo2);
            }
            OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap2, OffsetRequest.CurrentVersion(), CamusJob.getKafkaClientName(jobContext)));
            OffsetResponse offsetsBefore2 = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap3, OffsetRequest.CurrentVersion(), CamusJob.getKafkaClientName(jobContext)));
            simpleConsumer.close();
            Iterator<TopicAndPartition> it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                TopicAndPartition next2 = it2.next();
                long j = offsetsBefore.offsets(next2.topic(), next2.partition())[0];
                long j2 = offsetsBefore2.offsets(next2.topic(), next2.partition())[0];
                EtlRequest etlRequest = new EtlRequest(jobContext, next2.topic(), Integer.toString(leaderInfo.getLeaderId()), next2.partition(), leaderInfo.getUri());
                etlRequest.setLatestOffset(j);
                etlRequest.setEarliestOffset(j2);
                arrayList.add(etlRequest);
            }
        }
        return arrayList;
    }

    public String createTopicRegEx(HashSet<String> hashSet) {
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = hashSet.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
            sb.append("|");
        }
        String str = "(" + sb.substring(0, sb.length() - 1) + ")";
        Pattern.compile(str);
        return str;
    }

    public List<TopicMetadata> filterWhitelistTopics(List<TopicMetadata> list, HashSet<String> hashSet) {
        ArrayList arrayList = new ArrayList();
        String createTopicRegEx = createTopicRegEx(hashSet);
        for (TopicMetadata topicMetadata : list) {
            if (Pattern.matches(createTopicRegEx, topicMetadata.topic())) {
                arrayList.add(topicMetadata);
            } else {
                log.info("Discarding topic : " + topicMetadata.topic());
            }
        }
        return arrayList;
    }

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        CamusJob.startTiming("getSplits");
        HashMap<LeaderInfo, ArrayList<TopicAndPartition>> hashMap = new HashMap<>();
        try {
            List<TopicMetadata> kafkaMetadata = getKafkaMetadata(jobContext);
            HashSet<String> hashSet = new HashSet<>(Arrays.asList(getKafkaWhitelistTopic(jobContext)));
            if (!hashSet.isEmpty()) {
                kafkaMetadata = filterWhitelistTopics(kafkaMetadata, hashSet);
            }
            HashSet<String> hashSet2 = new HashSet<>(Arrays.asList(getKafkaBlacklistTopic(jobContext)));
            String createTopicRegEx = hashSet2.isEmpty() ? "" : createTopicRegEx(hashSet2);
            for (TopicMetadata topicMetadata : kafkaMetadata) {
                if (Pattern.matches(createTopicRegEx, topicMetadata.topic())) {
                    log.info("Discarding topic (blacklisted): " + topicMetadata.topic());
                } else if (createMessageDecoder(jobContext, topicMetadata.topic())) {
                    for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                        if (partitionMetadata.errorCode() != ErrorMapping.NoError()) {
                            log.info("Skipping the creation of ETL request for Topic : " + topicMetadata.topic() + " and Partition : " + partitionMetadata.partitionId() + " Exception : " + ErrorMapping.exceptionFor(partitionMetadata.errorCode()));
                        } else {
                            LeaderInfo leaderInfo = new LeaderInfo(new URI("tcp://" + partitionMetadata.leader().getConnectionString()), partitionMetadata.leader().id());
                            if (hashMap.containsKey(leaderInfo)) {
                                ArrayList<TopicAndPartition> arrayList = hashMap.get(leaderInfo);
                                arrayList.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId()));
                                hashMap.put(leaderInfo, arrayList);
                            } else {
                                ArrayList<TopicAndPartition> arrayList2 = new ArrayList<>();
                                arrayList2.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId()));
                                hashMap.put(leaderInfo, arrayList2);
                            }
                        }
                    }
                } else {
                    log.info("Discarding topic (Decoder generation failed) : " + topicMetadata.topic());
                }
            }
            ArrayList<EtlRequest> fetchLatestOffsetAndCreateEtlRequests = fetchLatestOffsetAndCreateEtlRequests(jobContext, hashMap);
            Collections.sort(fetchLatestOffsetAndCreateEtlRequests, new Comparator<EtlRequest>() { // from class: com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.1
                @Override // java.util.Comparator
                public int compare(EtlRequest etlRequest, EtlRequest etlRequest2) {
                    return etlRequest.getTopic().compareTo(etlRequest2.getTopic());
                }
            });
            writeRequests(fetchLatestOffsetAndCreateEtlRequests, jobContext);
            Map<EtlRequest, EtlKey> previousOffsets = getPreviousOffsets(FileInputFormat.getInputPaths(jobContext), jobContext);
            Set<String> moveToLatestTopicsSet = getMoveToLatestTopicsSet(jobContext);
            Iterator<EtlRequest> it = fetchLatestOffsetAndCreateEtlRequests.iterator();
            while (it.hasNext()) {
                EtlRequest next = it.next();
                if (moveToLatestTopicsSet.contains(next.getTopic()) || moveToLatestTopicsSet.contains("all")) {
                    previousOffsets.put(next, new EtlKey(next.getTopic(), next.getLeaderId(), next.getPartition(), 0L, next.getLastOffset()));
                }
                EtlKey etlKey = previousOffsets.get(next);
                if (etlKey != null) {
                    next.setOffset(etlKey.getOffset());
                }
                if (next.getEarliestOffset() > next.getOffset() || next.getOffset() > next.getLastOffset()) {
                    if (next.getEarliestOffset() > next.getOffset()) {
                        log.error("The earliest offset was found to be more than the current offset");
                        log.error("Moving to the earliest offset available");
                    } else {
                        log.error("The current offset was found to be more than the latest offset");
                        log.error("Moving to the earliest offset available");
                    }
                    next.setOffset(next.getEarliestOffset());
                    previousOffsets.put(next, new EtlKey(next.getTopic(), next.getLeaderId(), next.getPartition(), 0L, next.getOffset()));
                }
                log.info(next);
            }
            writePrevious(previousOffsets.values(), jobContext);
            CamusJob.stopTiming("getSplits");
            CamusJob.startTiming("hadoop");
            CamusJob.setTime("hadoop_start");
            return allocateWork(fetchLatestOffsetAndCreateEtlRequests, jobContext);
        } catch (Exception e) {
            log.error("Unable to pull requests from Kafka brokers. Exiting the program", e);
            return null;
        }
    }

    private Set<String> getMoveToLatestTopicsSet(JobContext jobContext) {
        HashSet hashSet = new HashSet();
        String[] moveToLatestTopics = getMoveToLatestTopics(jobContext);
        if (moveToLatestTopics != null) {
            for (String str : moveToLatestTopics) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    private boolean createMessageDecoder(JobContext jobContext, String str) {
        try {
            MessageDecoderFactory.createMessageDecoder(jobContext, str);
            return true;
        } catch (Exception e) {
            log.error("failed to create decoder", e);
            return false;
        }
    }

    private List<InputSplit> allocateWork(List<EtlRequest> list, JobContext jobContext) throws IOException {
        int i = jobContext.getConfiguration().getInt("mapred.map.tasks", 30);
        Collections.sort(list, new Comparator<EtlRequest>() { // from class: com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.2
            @Override // java.util.Comparator
            public int compare(EtlRequest etlRequest, EtlRequest etlRequest2) {
                if (etlRequest2.estimateDataSize() == etlRequest.estimateDataSize()) {
                    return 0;
                }
                return etlRequest2.estimateDataSize() < etlRequest.estimateDataSize() ? -1 : 1;
            }
        });
        List<InputSplit> arrayList = new ArrayList<>();
        for (int i2 = 0; i2 < i; i2++) {
            EtlSplit etlSplit = new EtlSplit();
            if (list.size() > 0) {
                etlSplit.addRequest(list.get(0));
                arrayList.add(etlSplit);
                list.remove(0);
            }
        }
        Iterator<EtlRequest> it = list.iterator();
        while (it.hasNext()) {
            getSmallestMultiSplit(arrayList).addRequest(it.next());
        }
        return arrayList;
    }

    private EtlSplit getSmallestMultiSplit(List<InputSplit> list) throws IOException {
        EtlSplit etlSplit = (EtlSplit) list.get(0);
        for (int i = 1; i < list.size(); i++) {
            EtlSplit etlSplit2 = (EtlSplit) list.get(i);
            if ((etlSplit.getLength() == etlSplit2.getLength() && etlSplit.getNumRequests() > etlSplit2.getNumRequests()) || etlSplit.getLength() > etlSplit2.getLength()) {
                etlSplit = etlSplit2;
            }
        }
        return etlSplit;
    }

    private void writePrevious(Collection<EtlKey> collection, JobContext jobContext) throws IOException {
        FileSystem fileSystem = FileSystem.get(jobContext.getConfiguration());
        Path outputPath = FileOutputFormat.getOutputPath(jobContext);
        if (fileSystem.exists(outputPath)) {
            fileSystem.mkdirs(outputPath);
        }
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, jobContext.getConfiguration(), new Path(outputPath, "offsets-previous"), EtlKey.class, NullWritable.class);
        Iterator<EtlKey> it = collection.iterator();
        while (it.hasNext()) {
            createWriter.append(it.next(), NullWritable.get());
        }
        createWriter.close();
    }

    private void writeRequests(List<EtlRequest> list, JobContext jobContext) throws IOException {
        FileSystem fileSystem = FileSystem.get(jobContext.getConfiguration());
        Path outputPath = FileOutputFormat.getOutputPath(jobContext);
        if (fileSystem.exists(outputPath)) {
            fileSystem.mkdirs(outputPath);
        }
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, jobContext.getConfiguration(), new Path(outputPath, EtlMultiOutputFormat.REQUESTS_FILE), EtlRequest.class, NullWritable.class);
        Iterator<EtlRequest> it = list.iterator();
        while (it.hasNext()) {
            createWriter.append(it.next(), NullWritable.get());
        }
        createWriter.close();
    }

    private Map<EtlRequest, EtlKey> getPreviousOffsets(Path[] pathArr, JobContext jobContext) throws IOException {
        HashMap hashMap = new HashMap();
        for (Path path : pathArr) {
            FileSystem fileSystem = path.getFileSystem(jobContext.getConfiguration());
            for (FileStatus fileStatus : fileSystem.listStatus(path, new OffsetFileFilter())) {
                log.info("previous offset file:" + fileStatus.getPath().toString());
                SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fileStatus.getPath(), jobContext.getConfiguration());
                EtlKey etlKey = new EtlKey();
                while (true) {
                    EtlKey etlKey2 = etlKey;
                    if (reader.next(etlKey2, NullWritable.get())) {
                        EtlRequest etlRequest = new EtlRequest(jobContext, etlKey2.getTopic(), etlKey2.getLeaderId(), etlKey2.getPartition());
                        if (!hashMap.containsKey(etlRequest)) {
                            hashMap.put(etlRequest, etlKey2);
                        } else if (((EtlKey) hashMap.get(etlRequest)).getOffset() < etlKey2.getOffset()) {
                            hashMap.put(etlRequest, etlKey2);
                        }
                        etlKey = new EtlKey();
                    }
                }
                reader.close();
            }
        }
        return hashMap;
    }

    public static void setMoveToLatestTopics(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_MOVE_TO_LAST_OFFSET_LIST, str);
    }

    public static String[] getMoveToLatestTopics(JobContext jobContext) {
        return jobContext.getConfiguration().getStrings(KAFKA_MOVE_TO_LAST_OFFSET_LIST);
    }

    public static void setKafkaClientBufferSize(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_CLIENT_BUFFER_SIZE, i);
    }

    public static int getKafkaClientBufferSize(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_CLIENT_BUFFER_SIZE, 2097152);
    }

    public static void setKafkaClientTimeout(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_CLIENT_SO_TIMEOUT, i);
    }

    public static int getKafkaClientTimeout(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_CLIENT_SO_TIMEOUT, 60000);
    }

    public static void setKafkaMaxPullHrs(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_PULL_HRS, i);
    }

    public static int getKafkaMaxPullHrs(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_PULL_HRS, -1);
    }

    public static void setKafkaMaxPullMinutesPerTask(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_PULL_MINUTES_PER_TASK, i);
    }

    public static int getKafkaMaxPullMinutesPerTask(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_PULL_MINUTES_PER_TASK, -1);
    }

    public static void setKafkaMaxHistoricalDays(JobContext jobContext, int i) {
        jobContext.getConfiguration().setInt(KAFKA_MAX_HISTORICAL_DAYS, i);
    }

    public static int getKafkaMaxHistoricalDays(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(KAFKA_MAX_HISTORICAL_DAYS, -1);
    }

    public static void setKafkaBlacklistTopic(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_BLACKLIST_TOPIC, str);
    }

    public static String[] getKafkaBlacklistTopic(JobContext jobContext) {
        return (jobContext.getConfiguration().get(KAFKA_BLACKLIST_TOPIC) == null || jobContext.getConfiguration().get(KAFKA_BLACKLIST_TOPIC).isEmpty()) ? new String[0] : jobContext.getConfiguration().getStrings(KAFKA_BLACKLIST_TOPIC);
    }

    public static void setKafkaWhitelistTopic(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(KAFKA_WHITELIST_TOPIC, str);
    }

    public static String[] getKafkaWhitelistTopic(JobContext jobContext) {
        return (jobContext.getConfiguration().get(KAFKA_WHITELIST_TOPIC) == null || jobContext.getConfiguration().get(KAFKA_WHITELIST_TOPIC).isEmpty()) ? new String[0] : jobContext.getConfiguration().getStrings(KAFKA_WHITELIST_TOPIC);
    }

    public static void setEtlIgnoreSchemaErrors(JobContext jobContext, boolean z) {
        jobContext.getConfiguration().setBoolean(ETL_IGNORE_SCHEMA_ERRORS, z);
    }

    public static boolean getEtlIgnoreSchemaErrors(JobContext jobContext) {
        return jobContext.getConfiguration().getBoolean(ETL_IGNORE_SCHEMA_ERRORS, false);
    }

    public static void setEtlAuditIgnoreServiceTopicList(JobContext jobContext, String str) {
        jobContext.getConfiguration().set(ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST, str);
    }

    public static String[] getEtlAuditIgnoreServiceTopicList(JobContext jobContext) {
        return jobContext.getConfiguration().getStrings(ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST, new String[]{""});
    }

    public static void setMessageDecoderClass(JobContext jobContext, Class<MessageDecoder> cls) {
        jobContext.getConfiguration().setClass(CAMUS_MESSAGE_DECODER_CLASS, cls, MessageDecoder.class);
    }

    public static Class<MessageDecoder> getMessageDecoderClass(JobContext jobContext) {
        return jobContext.getConfiguration().getClass(CAMUS_MESSAGE_DECODER_CLASS, KafkaAvroMessageDecoder.class);
    }
}
