package org.wikidata.query.rdf.tool;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.Change.Batch;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

/* loaded from: input_file:org/wikidata/query/rdf/tool/Updater.class */
public class Updater<B extends Change.Batch> implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(Updater.class);
    private final Meter updatesMeter;
    private final Meter batchAdvanced;
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final int pollDelay;
    private final WikibaseUris uris;
    private final DelayQueue<Change.DelayedChange> deferralQueue = new DelayQueue<>();
    private ImmutableSetMultimap<String, String> repoValues;
    private ImmutableSetMultimap<String, String> repoRefs;
    private final boolean verify;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Updater(Change.Source<B> source, WikibaseRepository wikibaseRepository, RdfRepository rdfRepository, Munger munger, ExecutorService executorService, int i, WikibaseUris wikibaseUris, boolean z, MetricRegistry metricRegistry) {
        this.changeSource = source;
        this.wikibase = wikibaseRepository;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executorService;
        this.pollDelay = i;
        this.uris = wikibaseUris;
        this.verify = z;
        this.updatesMeter = metricRegistry.meter("updates");
        this.batchAdvanced = metricRegistry.meter("batch-progress");
    }

    @Override // java.lang.Runnable
    public void run() {
        B b = null;
        do {
            try {
                b = this.changeSource.firstBatch();
            } catch (RetryableException e) {
                log.warn("Retryable error fetching first batch.  Retrying.", e);
            }
        } while (b == null);
        log.debug("{} changes in batch", Integer.valueOf(b.changes().size()));
        Instant instant = null;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                handleChanges(addDeferredChanges(this.deferralQueue, b.changes()));
                Instant leftOffDate = b.leftOffDate();
                if (leftOffDate != null) {
                    Instant minusSeconds = leftOffDate.minusSeconds(1L);
                    if (instant == null || !instant.equals(minusSeconds)) {
                        syncDate(minusSeconds);
                        instant = minusSeconds;
                    }
                }
                this.batchAdvanced.mark(b.advanced());
                log.info("Polled up to {} at {} updates per second and {} {} per second", new Object[]{b.leftOffHuman(), meterReport(this.updatesMeter), meterReport(this.batchAdvanced), b.advancedUnits()});
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            if (b.last()) {
                return;
            }
            this.wikibase.batchDone();
            b = nextBatch(b);
        }
    }

    @VisibleForTesting
    static Collection<Change> addDeferredChanges(DelayQueue<Change.DelayedChange> delayQueue, Collection<Change> collection) {
        if (delayQueue.isEmpty()) {
            return collection;
        }
        LinkedList linkedList = new LinkedList(collection);
        int i = 0;
        Set set = (Set) collection.stream().map((v0) -> {
            return v0.entityId();
        }).collect(ImmutableSet.toImmutableSet());
        Change.DelayedChange poll = delayQueue.poll();
        while (true) {
            Change.DelayedChange delayedChange = poll;
            if (delayedChange == null) {
                log.info("Added {} deferred changes, {} still in the queue", Integer.valueOf(i), Integer.valueOf(delayQueue.size()));
                return linkedList;
            }
            if (!set.contains(delayedChange.getChange().entityId())) {
                linkedList.add(delayedChange.getChange());
                i++;
            }
            poll = delayQueue.poll();
        }
    }

    protected void syncDate(Instant instant) {
        this.rdfRepository.updateLeftOffTime(instant);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.executor.shutdown();
        this.changeSource.close();
    }

    protected void handleChanges(Iterable<Change> iterable) throws InterruptedException {
        Set<Change> revisionUpdates = getRevisionUpdates(iterable);
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Change change : revisionUpdates) {
            arrayList.add(this.executor.submit(() -> {
                while (true) {
                    try {
                        handleChange(change);
                        return change;
                    } catch (ContainedException e) {
                        log.warn("Contained error syncing.  Giving up on {}", change.entityId(), e);
                        throw e;
                    } catch (RetryableException e2) {
                        log.warn("Retryable error syncing.  Retrying.", e2);
                    }
                }
            }));
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                arrayList2.add(((Future) it.next()).get());
            } catch (ExecutionException e) {
            }
        }
        log.debug("Preparing update data took {} ms, have {} changes", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(arrayList2.size()));
        this.rdfRepository.syncFromChanges(arrayList2, this.verify);
        this.updatesMeter.mark(arrayList2.size());
    }

    private Set<Change> getRevisionUpdates(Iterable<Change> iterable) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashMap hashMap = new HashMap();
        for (Change change : iterable) {
            if (change.revision() > -1) {
                Change change2 = (Change) hashMap.get(change.entityId());
                if (change2 == null || change2.revision() < change.revision()) {
                    hashMap.put(change.entityId(), change);
                }
            } else {
                hashSet.add(change);
                hashSet2.add(this.uris.entity() + change.entityId());
            }
        }
        if (hashMap.size() > 0) {
            for (String str : this.rdfRepository.hasRevisions(hashMap.values())) {
                hashSet2.add(str);
                hashSet.add(hashMap.get(str.substring(this.uris.entity().length())));
            }
        }
        log.debug("Filtered batch contains {} changes", Integer.valueOf(hashSet.size()));
        if (hashSet.isEmpty()) {
            setValuesAndRefs(null, null);
        } else {
            setValuesAndRefs(this.rdfRepository.getValues(hashSet2), this.rdfRepository.getRefs(hashSet2));
            if (log.isDebugEnabled()) {
                synchronized (this) {
                    log.debug("Fetched {} values", Integer.valueOf(this.repoValues.size()));
                    log.debug("Fetched {} refs", Integer.valueOf(this.repoRefs.size()));
                }
            }
        }
        return hashSet;
    }

    private synchronized void setValuesAndRefs(ImmutableSetMultimap<String, String> immutableSetMultimap, ImmutableSetMultimap<String, String> immutableSetMultimap2) {
        this.repoValues = immutableSetMultimap;
        this.repoRefs = immutableSetMultimap2;
    }

    private B nextBatch(B b) throws InterruptedException {
        while (true) {
            try {
                B nextBatch = this.changeSource.nextBatch(b);
                if (!nextBatch.hasAnyChanges()) {
                    log.info("Sleeping for {} secs", Integer.valueOf(this.pollDelay));
                    Thread.sleep(this.pollDelay * HttpClientUtils.HTTP_RETRY_DELAY);
                } else {
                    if (!nextBatch.changes().isEmpty()) {
                        log.debug("{} changes in batch", Integer.valueOf(nextBatch.changes().size()));
                        return nextBatch;
                    }
                    b = nextBatch;
                }
            } catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", e);
            }
        }
    }

    private void handleChange(Change change) throws RetryableException {
        Multimap<String, String> multimap;
        Multimap<String, String> multimap2;
        log.debug("Processing data for {}", change);
        Collection<Statement> fetchRdfForEntity = this.wikibase.fetchRdfForEntity(change.entityId());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        synchronized (this) {
            multimap = this.repoValues;
            multimap2 = this.repoRefs;
        }
        this.munger.mungeWithValues(change.entityId(), fetchRdfForEntity, multimap, multimap2, hashSet, hashSet2, change, this.deferralQueue);
        change.setRefCleanupList(hashSet2);
        change.setValueCleanupList(hashSet);
        change.setStatements(fetchRdfForEntity);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", Double.valueOf(meter.getOneMinuteRate()), Double.valueOf(meter.getFiveMinuteRate()), Double.valueOf(meter.getFifteenMinuteRate()));
    }
}
