package org.wikidata.query.rdf.tool.change;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AtomicLongMap;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.tool.Utils;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.events.ChangeEvent;
import org.wikidata.query.rdf.tool.change.events.EventWithChronology;
import org.wikidata.query.rdf.tool.change.events.PageDeleteEvent;
import org.wikidata.query.rdf.tool.change.events.RevisionCreateEvent;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

/* loaded from: input_file:org/wikidata/query/rdf/tool/change/KafkaPoller.class */
public class KafkaPoller implements Change.Source<Batch> {
    private final Instant firstStartTime;
    private final int batchSize;
    private final Consumer<String, ChangeEvent> consumer;
    private final KafkaOffsetsRepository kafkaOffsetsRepository;
    private WikibaseRepository.Uris uris;

    @Nonnull
    private final ImmutableList<TopicPartition> topicPartitions;
    private final boolean ignoreStoredOffsets;
    private final Counter changesCounter;
    private final Timer pollingTimer;
    private static final Logger log = LoggerFactory.getLogger(KafkaPoller.class);
    private static final String MAX_POLL_PROPERTY = KafkaPoller.class.getName() + ".maxPoll";
    private static final String MAX_FETCH_PROPERTY = KafkaPoller.class.getName() + ".maxFetch";
    private static final String REPORTING_TOPIC_PROP = KafkaPoller.class.getName() + ".reportingTopic";
    private static final String DEFAULT_REPORTING_TOPIC = "mediawiki.revision-create";
    private static final Map<String, Class<? extends ChangeEvent>> defaultTopics = ImmutableMap.of(DEFAULT_REPORTING_TOPIC, RevisionCreateEvent.class, "mediawiki.page-delete", PageDeleteEvent.class, "mediawiki.page-undelete", RevisionCreateEvent.class);
    private final Queue<Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new ConcurrentLinkedQueue();
    private final String reportingTopic = System.getProperty(REPORTING_TOPIC_PROP, DEFAULT_REPORTING_TOPIC);

    /* loaded from: input_file:org/wikidata/query/rdf/tool/change/KafkaPoller$Batch.class */
    public static final class Batch extends Change.Batch.AbstractDefaultImplementation {
        private final Instant leftOffDate;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public Batch(ImmutableList<Change> immutableList, long j, String str, Instant instant, Map<TopicPartition, OffsetAndMetadata> map) {
            super(immutableList, j, str);
            this.leftOffDate = instant;
            this.offsets = map;
        }

        @Override // org.wikidata.query.rdf.tool.change.Change.Batch
        public String advancedUnits() {
            return "milliseconds";
        }

        @Override // org.wikidata.query.rdf.tool.change.Change.Batch
        public Instant leftOffDate() {
            return this.leftOffDate;
        }
    }

    public KafkaPoller(Consumer<String, ChangeEvent> consumer, WikibaseRepository.Uris uris, Instant instant, int i, Collection<String> collection, KafkaOffsetsRepository kafkaOffsetsRepository, boolean z, MetricRegistry metricRegistry) {
        this.consumer = consumer;
        this.uris = uris;
        this.firstStartTime = instant;
        this.batchSize = i;
        this.changesCounter = metricRegistry.counter("kafka-changes-counter");
        this.pollingTimer = metricRegistry.timer("kafka-changes-timer");
        this.topicPartitions = topicsToPartitions(collection, consumer);
        this.kafkaOffsetsRepository = kafkaOffsetsRepository;
        this.ignoreStoredOffsets = z;
    }

    private static Map<String, Class<? extends ChangeEvent>> clusterNamesAwareTopics(Collection<String> collection) {
        return (collection == null || collection.isEmpty()) ? defaultTopics : (Map) defaultTopics.entrySet().stream().flatMap(entry -> {
            return collection.stream().map(str -> {
                return Maps.immutableEntry(str + "." + ((String) entry.getKey()), entry.getValue());
            });
        }).collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static KafkaConsumer<String, ChangeEvent> buildKafkaConsumer(String str, String str2, Map<String, Class<? extends ChangeEvent>> map, int i) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", str2);
        properties.put("max.poll.interval.ms", "600000");
        properties.put("enable.auto.commit", "false");
        properties.put("max.poll.records", System.getProperty(MAX_POLL_PROPERTY, String.valueOf(i)));
        properties.put("max.partition.fetch.bytes", System.getProperty(MAX_FETCH_PROPERTY, String.valueOf(i * 1024)));
        log.info("Creating consumer {}", str2);
        return new KafkaConsumer<>(properties, new StringDeserializer(), new JsonDeserializer(map));
    }

    @Nonnull
    public static KafkaPoller buildKafkaPoller(String str, String str2, Collection<String> collection, WikibaseRepository.Uris uris, int i, Instant instant, boolean z, KafkaOffsetsRepository kafkaOffsetsRepository, MetricRegistry metricRegistry) {
        if (str2 == null) {
            throw new IllegalArgumentException("Consumer ID (--consumer) must be set");
        }
        Map<String, Class<? extends ChangeEvent>> clusterNamesAwareTopics = clusterNamesAwareTopics(collection);
        return new KafkaPoller(buildKafkaConsumer(str, str2, clusterNamesAwareTopics, i), uris, instant, i, ImmutableSet.copyOf(clusterNamesAwareTopics.keySet()), kafkaOffsetsRepository, z, metricRegistry);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.wikidata.query.rdf.tool.change.Change.Source
    public Batch firstBatch() throws RetryableException {
        Map<TopicPartition, OffsetAndTimestamp> fetchOffsets = fetchOffsets();
        this.consumer.assign(fetchOffsets.keySet());
        log.info("Subscribed to {} topics", Integer.valueOf(fetchOffsets.size()));
        fetchOffsets.forEach((topicPartition, offsetAndTimestamp) -> {
            if (offsetAndTimestamp == null) {
                log.info("No offset for {}, starting at the end", topicPartition);
                this.consumer.seekToEnd(Collections.singletonList(topicPartition));
            } else {
                this.consumer.seek(topicPartition, offsetAndTimestamp.offset());
                log.info("Set topic {} to {}", topicPartition, offsetAndTimestamp);
            }
        });
        return fetch(this.firstStartTime);
    }

    private Map<TopicPartition, OffsetAndTimestamp> fetchOffsets() {
        ImmutableMap of = this.ignoreStoredOffsets ? ImmutableMap.of() : this.kafkaOffsetsRepository.load(this.firstStartTime);
        ImmutableMap immutableMap = of;
        Map map = (Map) this.topicPartitions.stream().filter(topicPartition -> {
            return !immutableMap.containsKey(topicPartition);
        }).collect(Collectors.toMap(topicPartition2 -> {
            return topicPartition2;
        }, topicPartition3 -> {
            return Long.valueOf(this.firstStartTime.toEpochMilli());
        }));
        Map<TopicPartition, OffsetAndTimestamp> map2 = (Map) of.entrySet().stream().filter(entry -> {
            return this.topicPartitions.contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (map.size() > 0) {
            map2.putAll(this.consumer.offsetsForTimes(map));
        }
        return map2;
    }

    @Override // org.wikidata.query.rdf.tool.change.Change.Source
    public Batch nextBatch(Batch batch) throws RetryableException {
        return fetch(batch.leftOffDate());
    }

    @Override // org.wikidata.query.rdf.tool.change.Change.Source
    public void done(Batch batch) {
        this.offsetsToCommit.offer(batch.offsets);
        this.kafkaOffsetsRepository.store(batch.offsets);
    }

    private Batch fetch(Instant instant) throws RetryableException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Instant instant2 = Instant.EPOCH;
        AtomicLongMap create = AtomicLongMap.create();
        HashMap hashMap = new HashMap();
        while (true) {
            commitPendindOffsets();
            try {
                Timer.Context time = this.pollingTimer.time();
                Throwable th = null;
                try {
                    try {
                        ConsumerRecords poll = this.consumer.poll(1000L);
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                        int count = poll.count();
                        log.debug("Fetched {} records from Kafka", Integer.valueOf(count));
                        this.changesCounter.inc(count);
                        if (count != 0) {
                            boolean z = false;
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                ChangeEvent changeEvent = (ChangeEvent) consumerRecord.value();
                                String str = consumerRecord.topic();
                                hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset()));
                                log.trace("Got event t:{} o:{}", consumerRecord.topic(), Long.valueOf(consumerRecord.offset()));
                                if (changeEvent.domain().equals(this.uris.getHost()) && this.uris.isEntityNamespace(changeEvent.namespace())) {
                                    z = true;
                                    create.getAndIncrement(consumerRecord.topic());
                                    if (str.endsWith(this.reportingTopic)) {
                                        instant2 = (Instant) Utils.max(instant2, Instant.ofEpochMilli(consumerRecord.timestamp()));
                                    }
                                    Change makeChange = makeChange(changeEvent, consumerRecord.offset());
                                    Change change = (Change) linkedHashMap.put(makeChange.entityId(), makeChange);
                                    if (change != null && makeChange.revision() > -1 && (change.revision() > makeChange.revision() || change.revision() == -1)) {
                                        linkedHashMap.remove(makeChange.entityId());
                                        linkedHashMap.put(makeChange.entityId(), change);
                                    }
                                }
                            }
                            log.debug("{} records left after filtering", Integer.valueOf(linkedHashMap.size()));
                            if (linkedHashMap.size() >= this.batchSize) {
                                break;
                            }
                            if (linkedHashMap.size() > 0 && !z) {
                                log.info("Did not find anything useful in this batch, returning existing data");
                                break;
                            }
                        } else {
                            break;
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (InterruptException | WakeupException e) {
                throw new RetryableException("Error fetching recent changes", e);
            }
        }
        if (instant2.equals(Instant.EPOCH)) {
            instant2 = instant;
        }
        ImmutableList copyOf = ImmutableList.copyOf(linkedHashMap.values());
        log.info("Found {} changes", Integer.valueOf(copyOf.size()));
        if (log.isDebugEnabled()) {
            create.asMap().forEach((str2, l) -> {
                log.debug("Topic {}: {} records", str2, l);
            });
        }
        return new Batch(copyOf, ChronoUnit.MILLIS.between(instant, instant2), instant2.minusSeconds(1L).toString(), instant2, hashMap);
    }

    private Change makeChange(ChangeEvent changeEvent, long j) {
        return changeEvent instanceof EventWithChronology ? new Change(changeEvent.title(), changeEvent.revision(), changeEvent.timestamp(), j, ((EventWithChronology) changeEvent).chronologyId()) : new Change(changeEvent.title(), changeEvent.revision(), changeEvent.timestamp(), j);
    }

    private static ImmutableList<TopicPartition> topicsToPartitions(Collection<String> collection, Consumer<String, ChangeEvent> consumer) {
        return (ImmutableList) collection.stream().flatMap(str -> {
            return consumer.partitionsFor(str).stream();
        }).map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(ImmutableList.toImmutableList());
    }

    private void commitPendindOffsets() {
        while (true) {
            Map<TopicPartition, OffsetAndMetadata> poll = this.offsetsToCommit.poll();
            if (poll == null) {
                return;
            } else {
                this.consumer.commitAsync(poll, (map, exc) -> {
                    log.warn("Failed to commit offsets to kafka", exc);
                });
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Map<TopicPartition, OffsetAndMetadata> map;
        Map<TopicPartition, OffsetAndMetadata> map2 = null;
        while (true) {
            map = map2;
            Map<TopicPartition, OffsetAndMetadata> poll = this.offsetsToCommit.poll();
            if (poll == null) {
                break;
            } else {
                map2 = poll;
            }
        }
        if (map != null) {
            this.consumer.commitSync(map);
        }
        this.consumer.close();
    }
}
