package org.wikidata.query.rdf.tool.change;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.time.Instant;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.wikidata.query.rdf.tool.Utils;
import org.wikidata.query.rdf.tool.rdf.UpdateBuilder;
import org.wikidata.query.rdf.tool.rdf.client.RdfClient;

/* loaded from: input_file:org/wikidata/query/rdf/tool/change/RdfKafkaOffsetsRepository.class */
public class RdfKafkaOffsetsRepository implements KafkaOffsetsRepository {
    private static final String GET_OFFSETS = Utils.loadBody("GetKafkaOffsets", RdfKafkaOffsetsRepository.class);
    private static final String UPDATE_OFFSETS = Utils.loadBody("updateOffsets", RdfKafkaOffsetsRepository.class);
    private final URI root;
    private final RdfClient rdfClient;

    public RdfKafkaOffsetsRepository(URI uri, RdfClient rdfClient) {
        this.root = uri;
        this.rdfClient = rdfClient;
    }

    @Override // org.wikidata.query.rdf.tool.change.KafkaOffsetsRepository
    public Map<TopicPartition, OffsetAndTimestamp> load(Instant instant) {
        UpdateBuilder updateBuilder = new UpdateBuilder(GET_OFFSETS);
        updateBuilder.bindUri("root", this.root);
        return (Map) this.rdfClient.selectToMap(updateBuilder.toString(), ConsumerProtocol.TOPIC_KEY_NAME, "offset").entries().stream().collect(ImmutableMap.toImmutableMap(entry -> {
            String[] split = ((String) entry.getKey()).split(":", 2);
            return new TopicPartition(split[0], Integer.parseInt(split[1]));
        }, entry2 -> {
            return new OffsetAndTimestamp(Integer.parseInt((String) entry2.getValue()), instant.toEpochMilli());
        }));
    }

    @Override // org.wikidata.query.rdf.tool.change.KafkaOffsetsRepository
    @SuppressFBWarnings(value = {"VA_FORMAT_STRING_USES_NEWLINE"}, justification = "we want to be platform independent here.")
    public void store(Map<TopicPartition, OffsetAndMetadata> map) {
        UpdateBuilder updateBuilder = new UpdateBuilder(UPDATE_OFFSETS);
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            sb.append(String.format(Locale.ROOT, "<%s> wikibase:kafka ( \"%s:%d\" %d ) .\n", this.root, key.topic(), Integer.valueOf(key.partition()), Long.valueOf(entry.getValue().offset())));
        }
        updateBuilder.bindUri("root", this.root);
        updateBuilder.bind("data", sb.toString());
        this.rdfClient.update(updateBuilder.toString());
    }
}
