package com.linkedin.camus.etl.kafka.mapred;

import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.common.EtlCounts;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;

/* loaded from: input_file:com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputCommitter.class */
public class EtlMultiOutputCommitter extends FileOutputCommitter {
    private Pattern workingFileMetadataPattern;
    private HashMap<String, EtlCounts> counts;
    private HashMap<String, EtlKey> offsets;
    private TaskAttemptContext context;
    private final RecordWriterProvider recordWriterProvider;
    private Logger log;

    public void addCounts(EtlKey etlKey) throws IOException {
        String workingFileName = EtlMultiOutputFormat.getWorkingFileName(this.context, etlKey);
        if (!this.counts.containsKey(workingFileName)) {
            this.counts.put(workingFileName, new EtlCounts(etlKey.getTopic(), EtlMultiOutputFormat.getMonitorTimeGranularityMs(this.context)));
        }
        this.counts.get(workingFileName).incrementMonitorCount(etlKey);
        addOffset(etlKey);
    }

    public void addOffset(EtlKey etlKey) {
        this.offsets.put(etlKey.getTopic() + "-" + etlKey.getLeaderId() + "-" + etlKey.getPartition(), new EtlKey(etlKey));
    }

    public EtlMultiOutputCommitter(Path path, TaskAttemptContext taskAttemptContext, Logger logger) throws IOException {
        super(path, taskAttemptContext);
        this.counts = new HashMap<>();
        this.offsets = new HashMap<>();
        this.context = taskAttemptContext;
        try {
            this.recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(taskAttemptContext).newInstance();
            this.workingFileMetadataPattern = Pattern.compile("data\\.([^\\.]+)\\.([\\d_]+)\\.(\\d+)\\.([^\\.]+)-m-\\d+" + this.recordWriterProvider.getFilenameExtension());
            this.log = logger;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        ArrayList<Map<String, Object>> arrayList = new ArrayList<>();
        FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());
        if (EtlMultiOutputFormat.isRunMoveData(taskAttemptContext)) {
            Path workPath = super.getWorkPath();
            Path destinationPath = EtlMultiOutputFormat.getDestinationPath(taskAttemptContext);
            for (FileStatus fileStatus : fileSystem.listStatus(workPath)) {
                String name = fileStatus.getPath().getName();
                if (name.startsWith("data")) {
                    EtlCounts etlCounts = this.counts.get(name.substring(0, name.lastIndexOf("-m")));
                    Path path = new Path(destinationPath, getPartitionedPath(taskAttemptContext, name, etlCounts.getEventCount(), etlCounts.getLastKey().getOffset()));
                    if (!fileSystem.exists(path.getParent())) {
                        fileSystem.mkdirs(path.getParent());
                    }
                    commitFile(taskAttemptContext, fileStatus.getPath(), path);
                    if (EtlMultiOutputFormat.isRunTrackingPost(taskAttemptContext)) {
                        etlCounts.writeCountsToMap(arrayList, fileSystem, new Path(workPath, "counts." + path.getName().replace(this.recordWriterProvider.getFilenameExtension(), "")));
                    }
                }
            }
            if (EtlMultiOutputFormat.isRunTrackingPost(taskAttemptContext)) {
                Path path2 = new Path(workPath, "counts." + taskAttemptContext.getConfiguration().get("mapred.task.id"));
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileSystem.create(path2));
                ObjectMapper objectMapper = new ObjectMapper();
                this.log.info("Writing counts to : " + path2.toString());
                long currentTimeMillis = System.currentTimeMillis();
                objectMapper.writeValue(bufferedOutputStream, arrayList);
                this.log.debug("Time taken : " + ((System.currentTimeMillis() - currentTimeMillis) / 1000));
            }
        }
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, taskAttemptContext.getConfiguration(), new Path(super.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(taskAttemptContext, EtlMultiOutputFormat.OFFSET_PREFIX, "")), EtlKey.class, NullWritable.class);
        Iterator<String> it = this.offsets.keySet().iterator();
        while (it.hasNext()) {
            createWriter.append(this.offsets.get(it.next()), NullWritable.get());
        }
        createWriter.close();
        super.commitTask(taskAttemptContext);
    }

    protected void commitFile(JobContext jobContext, Path path, Path path2) throws IOException {
        FileSystem.get(jobContext.getConfiguration()).rename(path, path2);
    }

    public String getPartitionedPath(JobContext jobContext, String str, int i, long j) throws IOException {
        Matcher matcher = this.workingFileMetadataPattern.matcher(str);
        if (!matcher.find()) {
            throw new IOException("Could not extract metadata from working filename '" + str + "'");
        }
        String group = matcher.group(1);
        String group2 = matcher.group(2);
        String group3 = matcher.group(3);
        String group4 = matcher.group(4);
        return EtlMultiOutputFormat.getPartitioner(jobContext, group).generatePartitionedPath(jobContext, group, group2, Integer.parseInt(group3), group4) + "/" + group + "." + group2 + "." + group3 + "." + i + "." + j + "." + group4 + this.recordWriterProvider.getFilenameExtension();
    }
}
