package org.wikidata.query.rdf.tool;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.TimerCounter;
import org.wikidata.query.rdf.common.uri.UrisScheme;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.Change.Batch;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

/* loaded from: input_file:org/wikidata/query/rdf/tool/Updater.class */
public class Updater<B extends Change.Batch> implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Updater.class);
    private static final long DEFERRAL_DELAY = 5;
    private final Meter updatesMeter;
    private final Meter batchAdvanced;
    private final Meter skipAheadMeter;
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final boolean importAsync;
    private final int pollDelay;
    private final UrisScheme uris;
    private final TimerCounter wikibaseDataFetchTime;
    private final TimerCounter rdfRepositoryImportTime;
    private final TimerCounter rdfRepositoryFetchTime;
    private final Counter importedChanged;
    private final Counter noopedChangesByRevisionCheck;
    private final Counter importedTriples;
    private final boolean verify;
    private Instant lastRepoDate;
    private Thread importerThread;
    private final BlockingQueue<Runnable> importQueue = new ArrayBlockingQueue(5);
    private final DeferredChanges deferredChanges = new DeferredChanges();

    /* loaded from: input_file:org/wikidata/query/rdf/tool/Updater$ChangesWithValuesAndRefs.class */
    public static class ChangesWithValuesAndRefs {
        private final Set<Change> changes;
        private final Multimap<String, String> repoValues;
        private final Multimap<String, String> repoRefs;

        public ChangesWithValuesAndRefs(Set<Change> set, Multimap<String, String> multimap, Multimap<String, String> multimap2) {
            this.changes = set;
            this.repoValues = multimap;
            this.repoRefs = multimap2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Updater(Change.Source<B> source, WikibaseRepository wikibaseRepository, RdfRepository rdfRepository, Munger munger, ExecutorService executorService, boolean z, int i, UrisScheme urisScheme, boolean z2, MetricRegistry metricRegistry) {
        this.changeSource = source;
        this.wikibase = wikibaseRepository;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executorService;
        this.importAsync = z;
        this.pollDelay = i;
        this.uris = urisScheme;
        this.verify = z2;
        this.updatesMeter = metricRegistry.meter("updates");
        this.batchAdvanced = metricRegistry.meter("batch-progress");
        this.skipAheadMeter = metricRegistry.meter("updates-skip");
        this.wikibaseDataFetchTime = TimerCounter.counter(metricRegistry.counter("wikibase-data-fetch-time-cnt"));
        this.rdfRepositoryImportTime = TimerCounter.counter(metricRegistry.counter("rdf-repository-import-time-cnt"));
        this.rdfRepositoryFetchTime = TimerCounter.counter(metricRegistry.counter("rdf-repository-fetch-time-cnt"));
        this.noopedChangesByRevisionCheck = metricRegistry.counter("noop-by-revision-check");
        this.importedChanged = metricRegistry.counter("rdf-repository-imported-changes");
        this.importedTriples = metricRegistry.counter("rdf-repository-imported-triples");
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.importAsync) {
            startImporter();
        }
        B b = null;
        do {
            try {
                try {
                    try {
                        b = this.changeSource.firstBatch();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        if (this.importAsync) {
                            this.importerThread.interrupt();
                            return;
                        }
                        return;
                    }
                } catch (RetryableException e2) {
                    log.warn("Retryable error fetching first batch.  Retrying.", (Throwable) e2);
                }
            } catch (Throwable th) {
                if (this.importAsync) {
                    this.importerThread.interrupt();
                }
                throw th;
            }
        } while (b == null);
        log.debug("{} changes in batch", Integer.valueOf(b.changes().size()));
        while (!Thread.currentThread().isInterrupted()) {
            applyBatch(b);
            if (b.last()) {
                if (this.importAsync) {
                    this.importerThread.interrupt();
                    return;
                }
                return;
            }
            b = nextBatch(b);
        }
        if (this.importAsync) {
            this.importerThread.interrupt();
        }
    }

    private void applyBatch(B b) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        handleChanges(this.deferredChanges.augmentWithDeferredChanges(b.changes()), () -> {
            Instant leftOffDate = b.leftOffDate();
            if (leftOffDate != null) {
                syncDate(leftOffDate);
            }
            this.batchAdvanced.mark(b.advanced());
            log.info("Polled up to {} at {} updates per second and {} {} per second", b.leftOffHuman(), meterReport(this.updatesMeter), meterReport(this.batchAdvanced), b.advancedUnits());
            this.changeSource.done(b);
            countDownLatch.countDown();
            if (b.last()) {
                return;
            }
            this.wikibase.batchDone();
        });
        if (b.last() && this.importAsync) {
            while (!countDownLatch.await(1L, TimeUnit.SECONDS)) {
                checkImporterAlive();
            }
        }
    }

    private void checkImporterAlive() {
        if (!this.importerThread.isAlive()) {
            throw new RuntimeException("Imported thread died, cannot continue");
        }
    }

    private void startImporter() {
        if (this.importerThread != null) {
            throw new IllegalStateException("Importer thread already created");
        }
        this.importerThread = new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    this.importQueue.take().run();
                } catch (InterruptedException e) {
                    return;
                }
            }
        }, "Importer");
        this.importerThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("Importer error", th);
        });
        this.importerThread.start();
    }

    protected synchronized void syncDate(Instant instant) {
        if (this.lastRepoDate == null || instant.isAfter(this.lastRepoDate)) {
            this.rdfRepository.updateLeftOffTime(instant.minusSeconds(1L));
            this.lastRepoDate = instant;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.executor.shutdown();
        this.changeSource.close();
    }

    protected void handleChanges(Collection<Change> collection, Runnable runnable) throws InterruptedException {
        ChangesWithValuesAndRefs changesWithValuesAndRefs = (ChangesWithValuesAndRefs) this.rdfRepositoryFetchTime.time(() -> {
            return getRevisionUpdates(collection);
        });
        this.noopedChangesByRevisionCheck.inc(collection.size() - changesWithValuesAndRefs.changes.size());
        List list = (List) this.wikibaseDataFetchTime.timeCheckedCallable(() -> {
            return fetchDataFromWikibaseAndMunge(changesWithValuesAndRefs);
        });
        Runnable runnable2 = () -> {
            importToRdfRepository(list, runnable);
        };
        if (!this.importAsync) {
            runnable2.run();
        } else {
            while (!this.importQueue.offer(runnable2, 1L, TimeUnit.SECONDS)) {
                checkImporterAlive();
            }
        }
    }

    private void importToRdfRepository(List<Change> list, Runnable runnable) {
        int intValue = ((Integer) this.rdfRepositoryImportTime.time(() -> {
            return Integer.valueOf(this.rdfRepository.syncFromChanges(list, this.verify));
        })).intValue();
        this.updatesMeter.mark(list.size());
        this.importedChanged.inc(list.size());
        this.importedTriples.inc(intValue);
        runnable.run();
    }

    private List<Change> fetchDataFromWikibaseAndMunge(ChangesWithValuesAndRefs changesWithValuesAndRefs) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (Change change : changesWithValuesAndRefs.changes) {
            arrayList.add(this.executor.submit(() -> {
                while (true) {
                    try {
                        handleChange(change, changesWithValuesAndRefs.repoValues, changesWithValuesAndRefs.repoRefs);
                        return change;
                    } catch (ContainedException e) {
                        log.warn("Contained error syncing.  Giving up on {}", change.entityId(), e);
                        throw e;
                    } catch (RetryableException e2) {
                        log.warn("Retryable error syncing.  Retrying.", (Throwable) e2);
                    }
                }
            }));
        }
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                arrayList2.add(((Future) it.next()).get());
            } catch (ExecutionException e) {
            }
        }
        return arrayList2;
    }

    private ChangesWithValuesAndRefs getRevisionUpdates(Iterable<Change> iterable) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashMap hashMap = new HashMap();
        for (Change change : iterable) {
            if (change.revision() > -1) {
                Change change2 = (Change) hashMap.get(change.entityId());
                if (change2 == null || change2.revision() < change.revision()) {
                    hashMap.put(change.entityId(), change);
                }
            } else {
                hashSet.add(change);
                hashSet2.add(this.uris.entityIdToURI(change.entityId()));
            }
        }
        if (hashMap.size() > 0) {
            for (String str : this.rdfRepository.hasRevisions(hashMap.values())) {
                hashSet2.add(str);
                hashSet.add(hashMap.get(this.uris.entityURItoId(str)));
            }
        }
        log.debug("Filtered batch contains {} changes", Integer.valueOf(hashSet.size()));
        if (hashSet.isEmpty()) {
            return new ChangesWithValuesAndRefs(hashSet, ImmutableSetMultimap.of(), ImmutableSetMultimap.of());
        }
        ImmutableSetMultimap<String, String> values = this.rdfRepository.getValues(hashSet2);
        ImmutableSetMultimap<String, String> refs = this.rdfRepository.getRefs(hashSet2);
        if (log.isDebugEnabled()) {
            synchronized (this) {
                log.debug("Fetched {} values", Integer.valueOf(values.size()));
                log.debug("Fetched {} refs", Integer.valueOf(refs.size()));
            }
        }
        return new ChangesWithValuesAndRefs(hashSet, values, refs);
    }

    private B nextBatch(B b) throws InterruptedException {
        while (true) {
            try {
                B nextBatch = this.changeSource.nextBatch(b);
                if (!nextBatch.hasAnyChanges()) {
                    log.info("Sleeping for {} secs", Integer.valueOf(this.pollDelay));
                    Thread.sleep(this.pollDelay * 1000);
                } else {
                    if (!nextBatch.changes().isEmpty()) {
                        log.debug("{} changes in batch", Integer.valueOf(nextBatch.changes().size()));
                        return nextBatch;
                    }
                    b = nextBatch;
                }
            } catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", (Throwable) e);
            }
        }
    }

    private void handleChange(Change change, Multimap<String, String> multimap, Multimap<String, String> multimap2) throws RetryableException {
        log.debug("Processing data for {}", change);
        Collection<Statement> fetchRdfForEntity = this.wikibase.fetchRdfForEntity(change);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        if (!fetchRdfForEntity.isEmpty()) {
            long mungeWithValues = this.munger.mungeWithValues(change.entityId(), fetchRdfForEntity, multimap, multimap2, hashSet, hashSet2);
            long revision = change.revision();
            if (revision > 0 && mungeWithValues > 0) {
                if (mungeWithValues < revision) {
                    log.warn("Stale revision on {}: change is {}, RDF is {}", change.entityId(), Long.valueOf(revision), Long.valueOf(mungeWithValues));
                    this.deferredChanges.add(change, DEFERRAL_DELAY);
                }
                if (revision < mungeWithValues) {
                    this.skipAheadMeter.mark();
                }
            }
        }
        change.setRefCleanupList(hashSet2);
        change.setValueCleanupList(hashSet);
        change.setStatements(fetchRdfForEntity);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", Double.valueOf(meter.getOneMinuteRate()), Double.valueOf(meter.getFiveMinuteRate()), Double.valueOf(meter.getFifteenMinuteRate()));
    }
}
