package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.coders.MessageEncoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;

@JsonIgnoreProperties({"trackingCount", "lastKey", "eventCount", "RANDOM"})
/* loaded from: input_file:com/linkedin/camus/etl/kafka/common/EtlCounts.class */
public class EtlCounts {
    private static final String TOPIC = "topic";
    private static final String GRANULARITY = "granularity";
    private static final String COUNTS = "counts";
    private static final String START_TIME = "startTime";
    private static final String END_TIME = "endTime";
    private static final String FIRST_TIMESTAMP = "firstTimestamp";
    private static final String LAST_TIMESTAMP = "lastTimestamp";
    private static final String ERROR_COUNT = "errorCount";
    private static final String MONITORING_EVENT_CLASS = "monitoring.event.class";
    private String topic;
    private long startTime;
    private long granularity;
    private long errorCount;
    private long endTime;
    private long lastTimestamp;
    private long firstTimestamp;
    private HashMap<String, Source> counts;
    private transient EtlKey lastKey;
    private transient int eventCount;
    private static Logger log = Logger.getLogger(EtlCounts.class);
    private static final transient Random RANDOM = new Random();

    public EtlCounts() {
        this.eventCount = 0;
    }

    public EtlCounts(String str, long j, long j2) {
        this.eventCount = 0;
        this.topic = str;
        this.granularity = j;
        this.startTime = j2;
        this.counts = new HashMap<>();
    }

    public EtlCounts(String str, long j) {
        this(str, j, System.currentTimeMillis());
    }

    public HashMap<String, Source> getCounts() {
        return this.counts;
    }

    public long getEndTime() {
        return this.endTime;
    }

    public long getErrorCount() {
        return this.errorCount;
    }

    public long getFirstTimestamp() {
        return this.firstTimestamp;
    }

    public long getGranularity() {
        return this.granularity;
    }

    public long getLastTimestamp() {
        return this.lastTimestamp;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setCounts(HashMap<String, Source> hashMap) {
        this.counts = hashMap;
    }

    public void setEndTime(long j) {
        this.endTime = j;
    }

    public void setErrorCount(long j) {
        this.errorCount = j;
    }

    public void setFirstTimestamp(long j) {
        this.firstTimestamp = j;
    }

    public void setGranularity(long j) {
        this.granularity = j;
    }

    public void setLastTimestamp(long j) {
        this.lastTimestamp = j;
    }

    public void setStartTime(long j) {
        this.startTime = j;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public int getEventCount() {
        return this.eventCount;
    }

    public EtlKey getLastKey() {
        return this.lastKey;
    }

    public void setEventCount(int i) {
        this.eventCount = i;
    }

    public void setLastKey(EtlKey etlKey) {
        this.lastKey = etlKey;
    }

    public void incrementMonitorCount(EtlKey etlKey) {
        Source source = new Source(etlKey.getServer(), etlKey.getService(), DateUtils.getPartition(this.granularity, etlKey.getTime()));
        if (this.counts.containsKey(source.toString())) {
            Source source2 = this.counts.get(source.toString());
            source2.setCount(source2.getCount() + 1);
            this.counts.put(source2.toString(), source2);
        } else {
            source.setCount(1L);
            this.counts.put(source.toString(), source);
        }
        if (etlKey.getTime() > this.lastTimestamp) {
            this.lastTimestamp = etlKey.getTime();
        }
        if (etlKey.getTime() < this.firstTimestamp) {
            this.firstTimestamp = etlKey.getTime();
        }
        this.lastKey = new EtlKey(etlKey);
        this.eventCount++;
    }

    public void writeCountsToMap(ArrayList<Map<String, Object>> arrayList, FileSystem fileSystem, Path path) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC, this.topic);
        hashMap.put(GRANULARITY, Long.valueOf(this.granularity));
        hashMap.put("counts", this.counts);
        hashMap.put(START_TIME, Long.valueOf(this.startTime));
        hashMap.put(END_TIME, Long.valueOf(this.endTime));
        hashMap.put(FIRST_TIMESTAMP, Long.valueOf(this.firstTimestamp));
        hashMap.put(LAST_TIMESTAMP, Long.valueOf(this.lastTimestamp));
        hashMap.put(ERROR_COUNT, Long.valueOf(this.errorCount));
        arrayList.add(hashMap);
    }

    public void postTrackingCountToKafka(Configuration configuration, String str, String str2) {
        try {
            MessageEncoder messageEncoder = (MessageEncoder) Class.forName(configuration.get(CamusJob.CAMUS_MESSAGE_ENCODER_CLASS)).newInstance();
            Properties properties = new Properties();
            Iterator it = configuration.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                properties.put(entry.getKey(), entry.getValue());
            }
            messageEncoder.init(properties, "TrackingMonitoringEvent");
            AbstractMonitoringEvent abstractMonitoringEvent = (AbstractMonitoringEvent) Class.forName(configuration.get(MONITORING_EVENT_CLASS)).getDeclaredConstructor(Configuration.class).newInstance(configuration);
            ArrayList<byte[]> arrayList = new ArrayList<>();
            int i = 0;
            Iterator<Map.Entry<String, Source>> it2 = getCounts().entrySet().iterator();
            while (it2.hasNext()) {
                arrayList.add(messageEncoder.toBytes(abstractMonitoringEvent.createMonitoringEventRecord(it2.next().getValue(), this.topic, this.granularity, str)));
                if (arrayList.size() >= 2000) {
                    i += arrayList.size();
                    produceCount(str2, arrayList);
                    arrayList.clear();
                }
            }
            if (arrayList.size() > 0) {
                i += arrayList.size();
                produceCount(str2, arrayList);
            }
            log.info(this.topic + " sent " + i + " counts");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void produceCount(String str, ArrayList<byte[]> arrayList) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", str);
        properties.put("producer.type", "async");
        properties.put("request.required.acks", "1");
        properties.put("request.timeout.ms", "30000");
        log.debug("Broker list: " + str);
        Producer producer = new Producer(new ProducerConfig(properties));
        try {
            try {
                Iterator<byte[]> it = arrayList.iterator();
                while (it.hasNext()) {
                    producer.send(new KeyedMessage("TrackingMonitoringEvent", it.next()));
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(this.topic + " issue sending tracking to " + str.toString());
                if (producer != null) {
                    producer.close();
                }
            }
        } finally {
            if (producer != null) {
                producer.close();
            }
        }
    }
}
