package org.wikidata.query.rdf.tool;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lexicalscope.jewel.cli.Option;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DateUtils;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.OptionsUtils;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.Change.Batch;
import org.wikidata.query.rdf.tool.change.IdListChangeSource;
import org.wikidata.query.rdf.tool.change.IdRangeChangeSource;
import org.wikidata.query.rdf.tool.change.RecentChangesPoller;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

/* loaded from: input_file:org/wikidata/query/rdf/tool/Update.class */
public class Update<B extends Change.Batch> implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Update.class);
    private final MetricRegistry metrics = new MetricRegistry();
    private final Meter updateMeter = this.metrics.meter("updates");
    private final Meter batchAdvanced = this.metrics.meter("batch-progress");
    private final JmxReporter reporter = JmxReporter.forRegistry(this.metrics).build();
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final int pollDelay;
    private final WikibaseUris uris;
    private Multimap<String, String> repoValues;
    private Multimap<String, String> repoRefs;
    private final boolean verify;

    /* loaded from: input_file:org/wikidata/query/rdf/tool/Update$Options.class */
    public interface Options extends OptionsUtils.BasicOptions, OptionsUtils.MungerOptions, OptionsUtils.WikibaseOptions {
        @Option(defaultValue = {"https"}, description = "Wikidata url scheme")
        String wikibaseScheme();

        @Option(shortName = {"s"}, defaultToNull = true, description = "Start time in 2015-02-11T17:11:08Z or 20150211170100 format.")
        String start();

        @Option(defaultToNull = true, description = "If specified must be <id> or list of <id>, comma or space separated.")
        List<String> ids();

        @Option(defaultToNull = true, description = "If specified must be <start>-<end>. Ids are iterated instead of recent changes. Start and end are inclusive.")
        String idrange();

        @Option(shortName = {"u"}, description = "URL to post updates and queries.")
        String sparqlUrl();

        @Option(shortName = {"d"}, defaultValue = {"10"}, description = "Poll delay when no updates found")
        int pollDelay();

        @Option(shortName = {"t"}, defaultValue = {"10"}, description = "Thread count")
        int threadCount();

        @Option(shortName = {"b"}, defaultValue = {"100"}, description = "Number of recent changes fetched at a time.")
        int batchSize();

        @Option(shortName = {"V"}, longName = {"verify"}, description = "Verify updates (may have performance impact)")
        boolean verify();

        @Option(defaultValue = {"0"}, shortName = {"T"}, longName = {"tailPoller"}, description = "Use secondary poller with given gap (seconds) to catch up missed updates")
        int tailPollerOffset();

        @Option(defaultToNull = true, description = "If specified must be numerical indexes of Item and Property namespaces that defined in Wikibase repository, comma separated.")
        String entityNamespaces();
    }

    public static void main(String[] strArr) {
        Options options = (Options) OptionsUtils.handleOptions(Options.class, strArr);
        WikibaseRepository buildWikibaseRepository = buildWikibaseRepository(options);
        if (buildWikibaseRepository == null) {
            return;
        }
        try {
            URI uri = new URI(options.sparqlUrl());
            WikibaseUris wikibaseUris = new WikibaseUris(options.wikibaseHost());
            RdfRepository rdfRepository = new RdfRepository(uri, wikibaseUris);
            Change.Source<? extends Change.Batch> buildChangeSource = buildChangeSource(options, rdfRepository, buildWikibaseRepository);
            if (buildChangeSource == null) {
                return;
            }
            int threadCount = options.threadCount();
            try {
                new Update(buildChangeSource, buildWikibaseRepository, rdfRepository, OptionsUtils.mungerFromOptions(options), new ThreadPoolExecutor(threadCount, threadCount, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s").build()), options.pollDelay(), wikibaseUris, options.verify()).run();
                rdfRepository.close();
            } catch (Throwable th) {
                rdfRepository.close();
                throw th;
            }
        } catch (URISyntaxException e) {
            log.error("Invalid url:  " + options.sparqlUrl(), e);
        }
    }

    private static Change.Source<? extends Change.Batch> buildChangeSource(Options options, RdfRepository rdfRepository, WikibaseRepository wikibaseRepository) {
        long time;
        long parseLong;
        long parseLong2;
        if (options.idrange() != null) {
            String[] split = options.idrange().split("-");
            switch (split.length) {
                case 1:
                    if (!Character.isDigit(split[0].charAt(0))) {
                        return new IdListChangeSource(split, options.batchSize());
                    }
                    parseLong = Long.parseLong(split[0]);
                    parseLong2 = parseLong;
                    break;
                case 2:
                    parseLong = Long.parseLong(split[0]);
                    parseLong2 = Long.parseLong(split[1]);
                    break;
                default:
                    log.error("Invalid format for --idrange.  Need <start>-<stop>.");
                    return null;
            }
            return IdRangeChangeSource.forItems(parseLong, parseLong2, options.batchSize());
        }
        if (options.ids() != null) {
            ArrayList arrayList = new ArrayList();
            for (String str : options.ids()) {
                if (str.contains(",")) {
                    arrayList.addAll(Arrays.asList(str.split(",")));
                } else {
                    arrayList.add(str);
                }
            }
            return new IdListChangeSource((String[]) arrayList.toArray(new String[arrayList.size()]), options.batchSize());
        }
        if (options.start() != null) {
            try {
                time = WikibaseRepository.outputDateFormat().parse(options.start()).getTime();
            } catch (ParseException e) {
                try {
                    time = WikibaseRepository.inputDateFormat().parse(options.start()).getTime();
                } catch (ParseException e2) {
                    log.error("Invalid date:  {}", options.start());
                    return null;
                }
            }
        } else {
            log.info("Checking where we left off");
            Date fetchLeftOffTime = rdfRepository.fetchLeftOffTime();
            long currentTimeMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30L);
            if (fetchLeftOffTime == null) {
                time = currentTimeMillis;
                log.info("Defaulting start time to 30 days ago:  {}", WikibaseRepository.inputDateFormat().format(new Date(time)));
            } else {
                if (fetchLeftOffTime.getTime() < currentTimeMillis) {
                    log.error("RDF store reports the last update time is before the minimum safe poll time.  You will have to reload from scratch or you might have missing data.");
                    return null;
                }
                time = fetchLeftOffTime.getTime();
                log.info("Found start time in the RDF store: {}", WikibaseRepository.inputDateFormat().format(fetchLeftOffTime));
            }
        }
        return new RecentChangesPoller(wikibaseRepository, new Date(time), options.batchSize(), options.tailPollerOffset());
    }

    private static WikibaseRepository buildWikibaseRepository(Options options) {
        if (options.entityNamespaces() == null) {
            return new WikibaseRepository(options.wikibaseScheme(), options.wikibaseHost());
        }
        String[] split = options.entityNamespaces().split(",");
        long[] jArr = new long[split.length];
        for (int i = 0; i < split.length; i++) {
            try {
                jArr[i] = Long.parseLong(split[i]);
            } catch (NumberFormatException e) {
                log.error("Invalid value for --entityNamespaces. Namespace index should be an integer.", e);
                return null;
            }
        }
        return new WikibaseRepository(options.wikibaseScheme(), options.wikibaseHost(), 0, jArr);
    }

    public Update(Change.Source<B> source, WikibaseRepository wikibaseRepository, RdfRepository rdfRepository, Munger munger, ExecutorService executorService, int i, WikibaseUris wikibaseUris, boolean z) {
        this.changeSource = source;
        this.wikibase = wikibaseRepository;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executorService;
        this.pollDelay = i;
        this.uris = wikibaseUris;
        this.verify = z;
        this.reporter.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        B b = null;
        do {
            try {
                b = this.changeSource.firstBatch();
            } catch (RetryableException e) {
                log.warn("Retryable error fetching first batch.  Retrying.", e);
            }
        } while (b == null);
        log.debug("{} changes in batch", Integer.valueOf(b.changes().size()));
        Date date = null;
        while (true) {
            try {
                handleChanges(b);
                Date leftOffDate = b.leftOffDate();
                if (leftOffDate != null) {
                    Date addSeconds = DateUtils.addSeconds(leftOffDate, -1);
                    if (date == null || !date.equals(addSeconds)) {
                        this.rdfRepository.updateLeftOffTime(addSeconds);
                        date = addSeconds;
                    }
                }
                this.batchAdvanced.mark(b.advanced());
                log.info("Polled up to {} at {} updates per second and {} {} per second", new Object[]{b.leftOffHuman(), meterReport(this.updateMeter), meterReport(this.batchAdvanced), b.advancedUnits()});
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e3) {
                log.error("Syncing encountered a fatal exception", e3);
                return;
            }
            if (b.last()) {
                return;
            } else {
                b = nextBatch(b);
            }
        }
    }

    private void handleChanges(Change.Batch batch) throws InterruptedException, ExecutionException {
        ArrayList arrayList = new ArrayList();
        Set<Change> revisionUpdates = getRevisionUpdates(batch);
        long currentTimeMillis = System.currentTimeMillis();
        for (final Change change : revisionUpdates) {
            arrayList.add(this.executor.submit(new Runnable() { // from class: org.wikidata.query.rdf.tool.Update.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            Update.this.handleChange(change);
                            return;
                        } catch (ContainedException e) {
                            Update.log.warn("Contained error syncing.  Giving up on " + change.entityId(), e);
                            return;
                        } catch (RetryableException e2) {
                            Update.log.warn("Retryable error syncing.  Retrying.", e2);
                        }
                    }
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        log.debug("Preparing update data took {} ms, have {} changes", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(revisionUpdates.size()));
        this.rdfRepository.syncFromChanges(revisionUpdates, this.verify);
        this.updateMeter.mark(revisionUpdates.size());
    }

    private Set<Change> getRevisionUpdates(Change.Batch batch) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashMap hashMap = new HashMap();
        for (Change change : batch.changes()) {
            if (change.revision() >= 0) {
                Change change2 = (Change) hashMap.get(change.entityId());
                if (change2 == null || change2.revision() < change.revision()) {
                    hashMap.put(change.entityId(), change);
                }
            } else {
                hashSet.add(change);
                hashSet2.add(this.uris.entity() + change.entityId());
            }
        }
        if (hashMap.size() > 0) {
            for (String str : this.rdfRepository.hasRevisions(hashMap.values())) {
                hashSet2.add(str);
                hashSet.add(hashMap.get(str.substring(this.uris.entity().length())));
            }
        }
        log.debug("Filtered batch contains {} changes", Integer.valueOf(hashSet.size()));
        if (hashSet.size() > 0) {
            this.repoValues = this.rdfRepository.getValues(hashSet2);
            log.debug("Fetched {} values", Integer.valueOf(this.repoValues.size()));
            this.repoRefs = this.rdfRepository.getRefs(hashSet2);
            log.debug("Fetched {} refs", Integer.valueOf(this.repoRefs.size()));
        } else {
            this.repoValues = null;
            this.repoRefs = null;
        }
        return hashSet;
    }

    private B nextBatch(B b) throws InterruptedException {
        B nextBatch;
        while (true) {
            try {
                nextBatch = this.changeSource.nextBatch(b);
            } catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", e);
            }
            if (!nextBatch.changes().isEmpty()) {
                log.debug("{} changes in batch", Integer.valueOf(nextBatch.changes().size()));
                return nextBatch;
            }
            log.debug("Sleeping for {} secs", Integer.valueOf(this.pollDelay));
            Thread.sleep(this.pollDelay * 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleChange(Change change) throws RetryableException {
        log.debug("Processing data for {}", change);
        Collection<Statement> fetchRdfForEntity = this.wikibase.fetchRdfForEntity(change.entityId());
        HashSet hashSet = new HashSet(this.repoValues.get(change.entityId()));
        HashSet hashSet2 = new HashSet(this.repoRefs.get(change.entityId()));
        this.munger.munge(change.entityId(), fetchRdfForEntity, hashSet, hashSet2, change);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(hashSet);
        arrayList.addAll(hashSet2);
        change.setStatements(fetchRdfForEntity);
        change.setCleanupList(arrayList);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", Double.valueOf(meter.getOneMinuteRate()), Double.valueOf(meter.getFiveMinuteRate()), Double.valueOf(meter.getFifteenMinuteRate()));
    }
}
