package com.bigdata.service.jini.master;

import com.bigdata.counters.CounterSet;
import com.bigdata.io.SerializerUtil;
import com.bigdata.jini.start.BigdataZooDefs;
import com.bigdata.service.IRemoteExecutor;
import com.bigdata.service.jini.JiniFederation;
import com.bigdata.service.jini.master.TaskMaster.JobState;
import com.bigdata.service.jini.util.DumpFederation;
import com.bigdata.util.concurrent.ExecutionExceptions;
import com.bigdata.zookeeper.ZLock;
import com.bigdata.zookeeper.ZLockImpl;
import com.bigdata.zookeeper.ZooHelper;
import com.tinkerpop.rexster.Tokens;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.jini.config.Configuration;
import net.jini.config.ConfigurationException;
import net.jini.core.lookup.ServiceItem;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:bigdata-1.5.1.jar:com/bigdata/service/jini/master/TaskMaster.class */
public abstract class TaskMaster<S extends JobState, T extends Callable<U>, U> implements Callable<Void> {
    protected static final Logger log = Logger.getLogger(TaskMaster.class);
    protected final JiniFederation<?> fed;
    private S jobState;

    /* loaded from: input_file:bigdata-1.5.1.jar:com/bigdata/service/jini/master/TaskMaster$ConfigurationOptions.class */
    public interface ConfigurationOptions {
        public static final String FORCE_OVERFLOW = "forceOverflow";
        public static final String INDEX_DUMP_DIR = "indexDumpDir";
        public static final String INDEX_DUMP_NAMESPACE = "indexDumpNamespace";
        public static final String DELETE_JOB = "deleteJob";
        public static final String NCLIENTS = "nclients";
        public static final String CLIENTS_TEMPLATE = "clientsTemplate";
        public static final String NAGGREGATORS = "naggregators";
        public static final String AGGREGATORS_TEMPLATE = "aggregatorsTemplate";
        public static final String SERVICES_TEMPLATES = "servicesTemplates";
        public static final String SERVICES_DISCOVERY_TIMEOUT = "awaitServicesTimeout";
        public static final String JOB_NAME = "jobName";
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:bigdata-1.5.1.jar:com/bigdata/service/jini/master/TaskMaster$DiscoverServicesWithPreconditionsTask.class */
    public class DiscoverServicesWithPreconditionsTask implements Callable<DiscoveredServices> {
        public DiscoverServicesWithPreconditionsTask() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public DiscoveredServices call() throws Exception {
            ServiceItem[] serviceItemArr;
            if (TaskMaster.this.jobState == null) {
                throw new IllegalArgumentException();
            }
            if (TaskMaster.this.jobState.servicesTemplates == null) {
                throw new IllegalArgumentException();
            }
            if (TaskMaster.this.jobState.servicesDiscoveryTimeout <= 0) {
                throw new IllegalArgumentException();
            }
            Future submit = TaskMaster.this.fed.getExecutorService().submit(new DiscoverServices(TaskMaster.this.fed, TaskMaster.this.jobState.clientsTemplate, TaskMaster.this.jobState.servicesDiscoveryTimeout));
            Future submit2 = TaskMaster.this.jobState.aggregatorsTemplate != null ? TaskMaster.this.fed.getExecutorService().submit(new DiscoverServices(TaskMaster.this.fed, TaskMaster.this.jobState.aggregatorsTemplate, TaskMaster.this.jobState.servicesDiscoveryTimeout)) : null;
            LinkedList linkedList = new LinkedList();
            for (ServicesTemplate servicesTemplate : TaskMaster.this.jobState.servicesTemplates) {
                linkedList.add(new DiscoverServices(TaskMaster.this.fed, servicesTemplate, TaskMaster.this.jobState.servicesDiscoveryTimeout));
            }
            Future[] futureArr = (Future[]) TaskMaster.this.fed.getExecutorService().invokeAll(linkedList).toArray(new Future[linkedList.size()]);
            LinkedList linkedList2 = new LinkedList();
            ServiceItem[] serviceItemArr2 = (ServiceItem[]) submit.get();
            if (serviceItemArr2.length < TaskMaster.this.jobState.clientsTemplate.minMatches) {
                String str = "Not enough services to run clients: found=" + serviceItemArr2.length + ", required=" + TaskMaster.this.jobState.clientsTemplate.minMatches + ", template=" + TaskMaster.this.jobState.clientsTemplate;
                TaskMaster.log.error(str);
                linkedList2.add(new RuntimeException(str));
            }
            if (TaskMaster.this.jobState.aggregatorsTemplate != null) {
                serviceItemArr = (ServiceItem[]) submit2.get();
                if (serviceItemArr.length < TaskMaster.this.jobState.aggregatorsTemplate.minMatches) {
                    String str2 = "Not enough services to run aggregators: found=" + serviceItemArr.length + ", required=" + TaskMaster.this.jobState.aggregatorsTemplate.minMatches + ", template=" + TaskMaster.this.jobState.aggregatorsTemplate;
                    TaskMaster.log.error(str2);
                    linkedList2.add(new RuntimeException(str2));
                }
            } else {
                serviceItemArr = new ServiceItem[0];
            }
            for (int i = 0; i < futureArr.length; i++) {
                Future future = futureArr[i];
                ServicesTemplate servicesTemplate2 = TaskMaster.this.jobState.servicesTemplates[i];
                try {
                    ServiceItem[] serviceItemArr3 = (ServiceItem[]) future.get();
                    if (serviceItemArr3.length < servicesTemplate2.minMatches) {
                        String str3 = "Not enough services: found=" + serviceItemArr3.length + ", required=" + servicesTemplate2.minMatches + ", template=" + servicesTemplate2;
                        TaskMaster.log.error(str3);
                        linkedList2.add(new RuntimeException(str3));
                    }
                } catch (Throwable th) {
                    linkedList2.add(th);
                }
            }
            if (linkedList2.isEmpty()) {
                return new DiscoveredServices(serviceItemArr2, serviceItemArr);
            }
            throw new ExecutionExceptions(linkedList2);
        }
    }

    /* loaded from: input_file:bigdata-1.5.1.jar:com/bigdata/service/jini/master/TaskMaster$DiscoveredServices.class */
    public static class DiscoveredServices {
        public final ServiceItem[] clientServiceItems;
        public final ServiceItem[] aggregatorServiceItems;

        public DiscoveredServices(ServiceItem[] serviceItemArr, ServiceItem[] serviceItemArr2) {
            if (serviceItemArr == null) {
                throw new IllegalArgumentException();
            }
            if (serviceItemArr2 == null) {
                throw new IllegalArgumentException();
            }
            this.clientServiceItems = serviceItemArr;
            this.aggregatorServiceItems = serviceItemArr2;
        }
    }

    /* loaded from: input_file:bigdata-1.5.1.jar:com/bigdata/service/jini/master/TaskMaster$JobState.class */
    public static class JobState implements Serializable {
        private static final long serialVersionUID = -340273551639560974L;
        final transient boolean deleteJob;
        boolean resumedJob = false;
        transient long beginMillis = 0;
        transient long endMillis = 0;
        protected transient Map<Integer, Future<?>> futures;
        public final String component;
        public final String jobName;
        public final int nclients;
        public final ServicesTemplate clientsTemplate;
        public final int naggregators;
        public final ServicesTemplate aggregatorsTemplate;
        public final ServicesTemplate[] servicesTemplates;
        public final long servicesDiscoveryTimeout;
        public final boolean forceOverflow;
        public final File indexDumpDir;
        public final String indexDumpNamespace;
        public final ServiceMap clientServiceMap;
        public final ServiceMap aggregatorServiceMap;

        public boolean isResumedJob() {
            return this.resumedJob;
        }

        public long getElapsedMillis() {
            if (this.beginMillis == 0) {
                return 0L;
            }
            return this.endMillis == 0 ? System.currentTimeMillis() - this.beginMillis : this.endMillis - this.beginMillis;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void toString(StringBuilder sb) {
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(getClass().getName());
            sb.append("{ resumedJob=" + isResumedJob());
            sb.append(", component=" + this.component);
            sb.append(", jobName=" + this.jobName);
            sb.append(", nclients=" + this.nclients);
            sb.append(", clientsTemplate=" + this.clientsTemplate);
            sb.append(", servicesTemplates=" + Arrays.toString(this.servicesTemplates));
            sb.append(", awaitServicesTimeout=" + this.servicesDiscoveryTimeout);
            sb.append(", forceOverflow=" + this.forceOverflow);
            toString(sb);
            sb.append("}");
            return sb.toString();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public JobState(String str, Configuration configuration) throws ConfigurationException {
            if (str == null) {
                throw new IllegalArgumentException();
            }
            if (configuration == null) {
                throw new IllegalArgumentException();
            }
            this.component = str;
            this.jobName = (String) configuration.getEntry(str, ConfigurationOptions.JOB_NAME, String.class);
            this.deleteJob = ((Boolean) configuration.getEntry(str, ConfigurationOptions.DELETE_JOB, Boolean.TYPE, Boolean.FALSE)).booleanValue();
            this.nclients = ((Integer) configuration.getEntry(str, ConfigurationOptions.NCLIENTS, Integer.TYPE)).intValue();
            this.naggregators = ((Integer) configuration.getEntry(str, ConfigurationOptions.NAGGREGATORS, Integer.TYPE, 0)).intValue();
            this.clientsTemplate = (ServicesTemplate) configuration.getEntry(str, ConfigurationOptions.CLIENTS_TEMPLATE, ServicesTemplate.class);
            this.aggregatorsTemplate = (ServicesTemplate) configuration.getEntry(str, ConfigurationOptions.AGGREGATORS_TEMPLATE, ServicesTemplate.class, null);
            this.servicesTemplates = (ServicesTemplate[]) configuration.getEntry(str, ConfigurationOptions.SERVICES_TEMPLATES, ServicesTemplate[].class);
            this.servicesDiscoveryTimeout = ((Long) configuration.getEntry(str, ConfigurationOptions.SERVICES_DISCOVERY_TIMEOUT, Long.TYPE)).longValue();
            this.forceOverflow = ((Boolean) configuration.getEntry(str, "forceOverflow", Boolean.TYPE, Boolean.FALSE)).booleanValue();
            this.indexDumpDir = (File) configuration.getEntry(str, ConfigurationOptions.INDEX_DUMP_DIR, File.class, null);
            this.indexDumpNamespace = (String) configuration.getEntry(str, ConfigurationOptions.INDEX_DUMP_NAMESPACE, String.class, null);
            this.clientServiceMap = new ServiceMap(this.nclients);
            this.aggregatorServiceMap = new ServiceMap(this.naggregators);
        }

        public final String getJobClassZPath(JiniFederation jiniFederation) {
            return jiniFederation.getZooConfig().zroot + "/" + BigdataZooDefs.JOBS + "/" + this.component;
        }

        public final String getJobZPath(JiniFederation jiniFederation) {
            return getJobClassZPath(jiniFederation) + "/" + this.jobName;
        }

        public final String getClientZPath(JiniFederation jiniFederation, int i) {
            return getJobZPath(jiniFederation) + "/client" + i;
        }

        public final String getLockNodeZPath(JiniFederation jiniFederation, int i) {
            return getClientZPath(jiniFederation, i) + "/locknode";
        }
    }

    public JiniFederation<?> getFederation() {
        return this.fed;
    }

    public S getJobState() {
        return this.jobState;
    }

    protected final Future<Void> innerMain() {
        final Future<Void> submit = this.fed.getExecutorService().submit(this);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.bigdata.service.jini.master.TaskMaster.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                submit.cancel(true);
                System.err.println("Shutting down: " + new Date());
            }
        });
        return submit;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TaskMaster(JiniFederation<?> jiniFederation) throws ConfigurationException {
        if (jiniFederation == null) {
            throw new IllegalArgumentException();
        }
        this.fed = jiniFederation;
        this.jobState = newJobState(System.getProperty("bigdata.component", getClass().getName()), jiniFederation.getClient().getConfiguration());
    }

    public void execute() throws InterruptedException, ExecutionException {
        try {
            innerMain().get();
            System.err.println("Done: " + new Date());
        } catch (Throwable th) {
            System.err.println("Done: " + new Date());
            throw th;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public final Void call() throws Exception {
        ZLock zLock = setupJob();
        try {
            this.jobState.beginMillis = System.currentTimeMillis();
            beginJob(getJobState());
            try {
                try {
                    runJob();
                    success(this.jobState);
                    tearDownJob(this.jobState, zLock);
                    if (!this.jobState.forceOverflow) {
                        return null;
                    }
                    forceOverflow();
                    return null;
                } catch (InterruptedException e) {
                    error(this.jobState, e);
                    throw e;
                }
            } catch (CancellationException e2) {
                error(this.jobState, e2);
                throw e2;
            } catch (ExecutionException e3) {
                error(this.jobState, e3);
                throw e3;
            }
        } catch (Throwable th) {
            tearDownJob(this.jobState, zLock);
            throw th;
        }
    }

    protected void runJob() throws Exception, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = true;
        try {
            startClients();
            awaitAll();
            z = false;
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isInfoEnabled()) {
                log.info("Done: " + (0 != 0 ? "failure" : Tokens.SUCCESS) + ", elapsed=" + currentTimeMillis2);
            }
        } catch (Throwable th) {
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isInfoEnabled()) {
                log.info("Done: " + (z ? "failure" : Tokens.SUCCESS) + ", elapsed=" + currentTimeMillis3);
            }
            throw th;
        }
    }

    protected void startClients() throws IOException {
        if (log.isInfoEnabled()) {
            log.info("Will run " + this.jobState.nclients);
        }
        this.jobState.futures = new LinkedHashMap(this.jobState.nclients);
        int i = 0;
        for (int i2 = 0; i2 < this.jobState.nclients; i2++) {
            try {
                ServiceItem serviceItem = this.jobState.clientServiceMap.getServiceItem(i2);
                if (serviceItem == null) {
                    throw new RuntimeException("ServiceItem not resolved? client#=" + i2);
                }
                if (!(serviceItem.service instanceof IRemoteExecutor)) {
                    throw new RuntimeException("Service does not implement " + IRemoteExecutor.class + ", serviceItem=" + serviceItem);
                }
                IRemoteExecutor iRemoteExecutor = (IRemoteExecutor) serviceItem.service;
                T newClientTask = newClientTask(i2);
                if (log.isInfoEnabled()) {
                    log.info("Running client#=" + i2 + " on " + serviceItem);
                }
                this.jobState.futures.put(Integer.valueOf(i2), iRemoteExecutor.submit(newClientTask));
                i++;
            } finally {
                if (i < this.jobState.nclients) {
                    log.error("Aborting : could not start client(s): nstarted=" + i + ", nclients=" + this.jobState.nclients);
                    cancelAll(true);
                }
            }
        }
    }

    protected void awaitAll() throws ExecutionException, InterruptedException {
        while (!allDone()) {
            try {
                int size = this.jobState.futures.size();
                if (log.isDebugEnabled()) {
                    log.debug("#remaining futures=" + size);
                }
                if (size < 10) {
                    Thread.sleep(1000L);
                } else {
                    Thread.sleep(10000L);
                }
            } catch (InterruptedException e) {
                log.error("Cancelling job: cause=" + e);
                try {
                    cancelAll(true);
                } catch (Throwable th) {
                    log.error(th);
                }
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                log.error("Cancelling job: cause=" + e2);
                try {
                    cancelAll(true);
                } catch (Throwable th2) {
                    log.error(th2);
                }
                throw new RuntimeException(e2);
            }
        }
    }

    protected void success(S s) throws Exception {
        s.endMillis = System.currentTimeMillis();
        if (log.isInfoEnabled()) {
            log.info("Clients done: elapsed=" + s.getElapsedMillis());
        }
        System.out.println("commit point: " + getFederation().getLastCommitTime());
        ZooHelper.destroyZNodes(this.fed.getZookeeperAccessor().getZookeeper(), s.getJobZPath(this.fed), 0);
    }

    protected void error(S s, Throwable th) {
        try {
            s.endMillis = System.currentTimeMillis();
            log.error("Abort: elapsed=" + s.getElapsedMillis() + " : cause=" + th, th);
        } finally {
            try {
                cancelAll(true);
            } catch (Throwable th2) {
                th2.printStackTrace(System.err);
            }
        }
    }

    protected void tearDownJob(S s, ZLock zLock) throws Exception {
        zLock.unlock();
    }

    protected abstract S newJobState(String str, Configuration configuration) throws ConfigurationException;

    protected abstract T newClientTask(int i);

    /* JADX INFO: Access modifiers changed from: protected */
    public void beginJob(S s) throws Exception {
        if (s.indexDumpDir != null) {
            this.fed.addScheduledTask(new DumpFederation.ScheduledDumpTask(this.fed, s.indexDumpNamespace, 10, s.indexDumpDir, "indexDump", TimeUnit.MINUTES), 0L, 1L, TimeUnit.MINUTES);
            this.fed.addScheduledTask(new DumpFederation.ScheduledDumpTask(this.fed, s.indexDumpNamespace, 5, s.indexDumpDir, "indexDump", TimeUnit.MINUTES), 10L, 10L, TimeUnit.MINUTES);
            this.fed.addScheduledTask(new DumpFederation.ScheduledDumpTask(this.fed, s.indexDumpNamespace, Integer.MAX_VALUE, s.indexDumpDir, "indexDump", TimeUnit.MINUTES), 1L, 1L, TimeUnit.HOURS);
        }
    }

    protected ZLock setupJob() throws KeeperException, InterruptedException, TimeoutException {
        ZooKeeper zookeeper = this.fed.getZookeeperAccessor().getZookeeper();
        try {
            zookeeper.create(this.fed.getZooConfig().zroot + "/" + BigdataZooDefs.JOBS, new byte[0], this.fed.getZooConfig().acl, CreateMode.PERSISTENT);
        } catch (KeeperException.NodeExistsException e) {
        }
        String jobClassZPath = this.jobState.getJobClassZPath(this.fed);
        try {
            zookeeper.create(jobClassZPath, new byte[0], this.fed.getZooConfig().acl, CreateMode.PERSISTENT);
        } catch (KeeperException.NodeExistsException e2) {
        }
        ZLockImpl lock = ZLockImpl.getLock(zookeeper, jobClassZPath + "/locknode_" + this.jobState.jobName, this.fed.getZooConfig().acl);
        lock.lock();
        try {
            String jobZPath = this.jobState.getJobZPath(this.fed);
            if (this.jobState.deleteJob && zookeeper.exists(jobZPath, false) != null) {
                log.warn("Deleting old job: " + jobZPath);
                ZooHelper.destroyZNodes(this.fed.getZookeeperAccessor().getZookeeper(), jobZPath, 0);
                detachPerformanceCounters();
            }
            try {
                zookeeper.create(jobZPath, SerializerUtil.serialize(this.jobState), this.fed.getZooConfig().acl, CreateMode.PERSISTENT);
                if (log.isInfoEnabled()) {
                    log.info("New job: " + this.jobState);
                }
            } catch (KeeperException.NodeExistsException e3) {
                this.jobState = (S) SerializerUtil.deserialize(zookeeper.getData(jobZPath, false, new Stat()));
                this.jobState.clientServiceMap.resolveServiceUUIDs(this.fed);
                this.jobState.aggregatorServiceMap.resolveServiceUUIDs(this.fed);
                this.jobState.resumedJob = true;
                log.warn("Pre-existing job: " + jobZPath);
            }
            try {
                DiscoveredServices call = new DiscoverServicesWithPreconditionsTask().call();
                this.jobState.clientServiceMap.assignClientsToServices(call.clientServiceItems);
                this.jobState.aggregatorServiceMap.assignClientsToServices(call.aggregatorServiceItems);
                zookeeper.setData(jobZPath, SerializerUtil.serialize(this.jobState), -1);
                if (log.isInfoEnabled()) {
                    log.info("Wrote client assignments into zookeeper.");
                }
                return lock;
            } catch (Throwable th) {
                try {
                    zookeeper.delete(jobZPath, -1);
                } catch (Throwable th2) {
                    log.error(th2);
                }
                throw new RuntimeException(th);
            }
        } catch (InterruptedException e4) {
            lock.unlock();
            throw e4;
        } catch (KeeperException e5) {
            lock.unlock();
            throw e5;
        } catch (Throwable th3) {
            lock.unlock();
            throw new RuntimeException(th3);
        }
    }

    protected void detachPerformanceCounters() {
        getFederation().getServiceCounterSet().makePath("Jobs").detach(this.jobState.jobName);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void attachPerformanceCounters(CounterSet counterSet) {
        if (counterSet == null) {
            throw new IllegalArgumentException();
        }
        getFederation().getServiceCounterSet().makePath("Jobs").makePath(getJobState().jobName).attach(counterSet, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected boolean allDone() throws InterruptedException, ExecutionException {
        if (this.jobState.futures == null) {
            throw new IllegalStateException();
        }
        LinkedList linkedList = new LinkedList();
        int size = this.jobState.futures.size();
        for (Map.Entry<Integer, Future<?>> entry : this.jobState.futures.entrySet()) {
            int intValue = entry.getKey().intValue();
            Future<?> value = entry.getValue();
            if (value.isDone()) {
                Object obj = value.get();
                size--;
                System.out.println("Done: " + new Date() + " : clientNum=" + intValue + " of " + this.jobState.nclients + " with " + size + " remaining : result=" + obj);
                try {
                    notifyOutcome(intValue, obj);
                } catch (Throwable th) {
                    log.error("Ignoring thrown exception: " + th);
                }
                linkedList.add(Integer.valueOf(intValue));
            }
        }
        Iterator it2 = linkedList.iterator();
        while (it2.hasNext()) {
            this.jobState.futures.remove(Integer.valueOf(((Integer) it2.next()).intValue()));
        }
        return this.jobState.futures.isEmpty();
    }

    protected synchronized void cancelAll(boolean z) {
        if (this.jobState.futures == null) {
            return;
        }
        log.warn("Cancelling all futures: nfutures=" + this.jobState.futures.size());
        Iterator<Future<?>> it2 = this.jobState.futures.values().iterator();
        while (it2.hasNext()) {
            Future<?> next = it2.next();
            if (!next.isDone()) {
                next.cancel(z);
            }
            it2.remove();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void forceOverflow() {
        System.out.println("Forcing overflow: now=" + new Date());
        this.fed.forceOverflow(true, true);
        System.out.println("Forced overflow: now=" + new Date());
    }

    protected void notifyOutcome(int i, U u) {
    }
}
