package org.wikidata.query.rdf.tool.change;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.tool.HttpClientUtils;
import org.wikidata.query.rdf.tool.change.RecentChangesPoller;
import org.wikidata.query.rdf.tool.exception.RetryableException;

/* loaded from: input_file:org/wikidata/query/rdf/tool/change/TailingChangesPoller.class */
public class TailingChangesPoller extends Thread {
    private static final Logger log = LoggerFactory.getLogger(TailingChangesPoller.class);
    private final RecentChangesPoller poller;
    private RecentChangesPoller.Batch lastBatch;
    private final int tailSeconds;
    private final BlockingQueue<RecentChangesPoller.Batch> queue;
    private volatile Instant mainPollerTs;

    public TailingChangesPoller(RecentChangesPoller recentChangesPoller, BlockingQueue<RecentChangesPoller.Batch> blockingQueue, int i) {
        this.poller = recentChangesPoller;
        this.tailSeconds = i;
        this.queue = blockingQueue;
    }

    public void setPollerTs(Instant instant) {
        this.mainPollerTs = instant;
    }

    public boolean isOldEnough(Instant instant) {
        return instant.isBefore(Instant.now().minusSeconds(this.tailSeconds));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        setName("TailPoller");
        while (true) {
            try {
                try {
                    if (this.lastBatch == null) {
                        this.lastBatch = this.poller.firstBatch();
                    } else {
                        this.lastBatch = this.poller.nextBatch(this.lastBatch);
                    }
                } catch (RetryableException e) {
                    log.warn("Retryable error fetching first batch.  Retrying.", e);
                }
                if (!this.lastBatch.changes().isEmpty()) {
                    log.info("Caught {} missing updates, adding to the queue", Integer.valueOf(this.lastBatch.changes().size()));
                    this.queue.put(this.lastBatch);
                }
                log.info("Tail poll up to {}", this.lastBatch.leftOffDate());
                if (this.mainPollerTs != null && this.mainPollerTs.isBefore(this.lastBatch.leftOffDate())) {
                    long between = ChronoUnit.MILLIS.between(this.mainPollerTs, this.lastBatch.leftOffDate()) + (this.tailSeconds * HttpClientUtils.HTTP_RETRY_DELAY);
                    log.info("Got ahead of main poller ({} > {}), sleeping for {}...", new Object[]{this.lastBatch.leftOffDate(), this.mainPollerTs, Long.valueOf(between)});
                    Thread.sleep(between);
                }
                if (!isOldEnough(this.lastBatch.leftOffDate())) {
                    long between2 = ChronoUnit.MILLIS.between(this.lastBatch.leftOffDate(), Instant.now().plusSeconds(this.tailSeconds + 2));
                    log.info("Got too close to the current stream, sleeping for {}...", Long.valueOf(between2));
                    Thread.sleep(between2);
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}
