package com.bigdata.bop.fed;

import com.bigdata.bop.BOp;
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.bop.IShardwisePipelineOp;
import com.bigdata.bop.engine.IChunkHandler;
import com.bigdata.bop.engine.IChunkMessage;
import com.bigdata.bop.engine.IQueryPeer;
import com.bigdata.bop.engine.IRunningQuery;
import com.bigdata.bop.engine.LocalChunkMessage;
import com.bigdata.bop.engine.StandaloneChunkHandler;
import com.bigdata.bop.fed.shards.MapBindingSetsOverShardsBuffer;
import com.bigdata.io.DirectBufferPoolAllocator;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.accesspath.BlockingBuffer;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.relation.accesspath.IBuffer;
import java.rmi.RemoteException;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/bigdata/bop/fed/FederationChunkHandler.class */
public class FederationChunkHandler<E> extends StandaloneChunkHandler {
    private static final Logger log = Logger.getLogger(FederationChunkHandler.class);
    public static final IChunkHandler INSTANCE = new FederationChunkHandler(Integer.MAX_VALUE, false);
    public static final IChunkHandler TEST_INSTANCE = new FederationChunkHandler(Integer.MAX_VALUE, true);
    private final int nioThreshold;
    private final boolean usePOJO;

    public FederationChunkHandler(int i, boolean z) {
        super(false);
        this.nioThreshold = i;
        this.usePOJO = z;
    }

    @Override // com.bigdata.bop.engine.StandaloneChunkHandler, com.bigdata.bop.engine.IChunkHandler
    public int handleChunk(IRunningQuery iRunningQuery, int i, int i2, IBindingSet[] iBindingSetArr) {
        if (iRunningQuery == null) {
            throw new IllegalArgumentException();
        }
        if (iBindingSetArr == null) {
            throw new IllegalArgumentException();
        }
        if (iBindingSetArr.length == 0) {
            return 0;
        }
        FederatedRunningQuery federatedRunningQuery = (FederatedRunningQuery) iRunningQuery;
        BOp bOp = federatedRunningQuery.getBOpIndex().get(Integer.valueOf(i2));
        if (bOp == null) {
            throw new IllegalStateException("Not found: " + i2);
        }
        if (log.isTraceEnabled()) {
            log.trace("queryId=" + iRunningQuery.getQueryId() + ", sourceBopId=" + i + ", sink=" + i2);
        }
        switch (bOp.getEvaluationContext()) {
            case ANY:
                return super.handleChunk(iRunningQuery, i, i2, iBindingSetArr);
            case HASHED:
                throw new UnsupportedOperationException();
            case SHARDED:
                IPredicate<E> predicate = ((IShardwisePipelineOp) bOp).getPredicate();
                MapBindingSetsOverShardsBuffer<IBindingSet, E> mapBindingSetsOverShardsBuffer = new MapBindingSetsOverShardsBuffer<IBindingSet, E>(federatedRunningQuery.getFederation(), predicate, predicate.getTimestamp(), 1000) { // from class: com.bigdata.bop.fed.FederationChunkHandler.1
                    @Override // com.bigdata.bop.fed.shards.MapBindingSetsOverShardsBuffer
                    protected IBuffer<IBindingSet[]> newBuffer(PartitionLocator partitionLocator) {
                        return new BlockingBuffer(Integer.MAX_VALUE, 100, 20L, BlockingBuffer.DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT);
                    }
                };
                for (IBindingSet iBindingSet : iBindingSetArr) {
                    mapBindingSetsOverShardsBuffer.add(iBindingSet);
                }
                mapBindingSetsOverShardsBuffer.flush();
                DirectBufferPoolAllocator.IAllocationContext allocationContext = federatedRunningQuery.getAllocationContext(new QueryContext(federatedRunningQuery.getQueryId()));
                int i3 = 0;
                for (Map.Entry<PartitionLocator, IBuffer<IBindingSet[]>> entry : mapBindingSetsOverShardsBuffer.getSinks().entrySet()) {
                    PartitionLocator key = entry.getKey();
                    IBlockingBuffer iBlockingBuffer = (IBlockingBuffer) entry.getValue();
                    iBlockingBuffer.close();
                    int i4 = 0;
                    LinkedList<IBindingSet[]> linkedList = new LinkedList();
                    IAsynchronousIterator<E> it2 = iBlockingBuffer.iterator();
                    while (it2.hasNext()) {
                        try {
                            IBindingSet[] iBindingSetArr2 = (IBindingSet[]) it2.next();
                            linkedList.add(iBindingSetArr2);
                            i4 += iBindingSetArr2.length;
                        } finally {
                            it2.close();
                        }
                    }
                    IBindingSet[] iBindingSetArr3 = new IBindingSet[i4];
                    int i5 = 0;
                    for (IBindingSet[] iBindingSetArr4 : linkedList) {
                        System.arraycopy(iBindingSetArr4, 0, iBindingSetArr3, i5, iBindingSetArr4.length);
                        i5 += iBindingSetArr4.length;
                    }
                    if (iBindingSetArr3.length > 0) {
                        sendChunkMessage(federatedRunningQuery, key.getDataServiceUUID(), i2, key.getPartitionId(), allocationContext, iBindingSetArr3);
                        i3++;
                    }
                }
                return i3;
            case CONTROLLER:
                sendChunkMessage(federatedRunningQuery, federatedRunningQuery.queryControllerUUID, i2, -1, federatedRunningQuery.getAllocationContext(new QueryContext(federatedRunningQuery.getQueryId())), iBindingSetArr);
                return 1;
            default:
                throw new AssertionError(bOp.getEvaluationContext());
        }
    }

    protected void sendChunkMessage(FederatedRunningQuery federatedRunningQuery, UUID uuid, int i, int i2, DirectBufferPoolAllocator.IAllocationContext iAllocationContext, IBindingSet[] iBindingSetArr) {
        if (uuid == null) {
            throw new IllegalArgumentException();
        }
        if (iAllocationContext == null) {
            throw new IllegalArgumentException();
        }
        if (iBindingSetArr == null) {
            throw new IllegalArgumentException();
        }
        IQueryPeer queryPeer = federatedRunningQuery.getQueryPeer(uuid);
        if (queryPeer == null) {
            throw new RuntimeException("Not found: serviceId=" + uuid);
        }
        if (queryPeer == federatedRunningQuery.getQueryEngine()) {
            LocalChunkMessage localChunkMessage = new LocalChunkMessage(federatedRunningQuery.getQueryController(), federatedRunningQuery.getQueryId(), i, i2, iBindingSetArr);
            if (log.isDebugEnabled()) {
                log.debug("Sending local message: " + localChunkMessage);
            }
            federatedRunningQuery.acceptChunk(localChunkMessage);
            return;
        }
        IChunkMessage<IBindingSet> localChunkMessage2 = this.usePOJO ? new LocalChunkMessage(federatedRunningQuery.getQueryController(), federatedRunningQuery.getQueryId(), i, i2, iBindingSetArr) : iBindingSetArr.length <= this.nioThreshold ? new ThickChunkMessage(federatedRunningQuery.getQueryController(), federatedRunningQuery.getQueryId(), i, i2, iBindingSetArr) : new NIOChunkMessage(federatedRunningQuery.getQueryController(), federatedRunningQuery.getQueryId(), i, i2, iAllocationContext, iBindingSetArr, federatedRunningQuery.getQueryEngine().getResourceService().getAddr());
        if (log.isDebugEnabled()) {
            log.debug("Sending remote message: " + localChunkMessage2);
        }
        FederatedQueryEngineCounters queryEngineCounters = federatedRunningQuery.getQueryEngine().getQueryEngineCounters();
        queryEngineCounters.chunksOut.increment();
        queryEngineCounters.solutionsOut.add(iBindingSetArr.length);
        try {
            queryPeer.bufferReady(localChunkMessage2);
        } catch (RemoteException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
