package com.bigdata.relation.accesspath;

import com.bigdata.util.DaemonThreadFactory;
import com.bigdata.util.InnerCause;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import junit.framework.TestCase2;

/* loaded from: input_file:com/bigdata/relation/accesspath/TestBlockingBuffer.class */
public class TestBlockingBuffer extends TestCase2 {
    private final ExecutorService service;

    public TestBlockingBuffer() {
        this.service = Executors.newCachedThreadPool(DaemonThreadFactory.defaultThreadFactory());
    }

    public TestBlockingBuffer(String str) {
        super(str);
        this.service = Executors.newCachedThreadPool(DaemonThreadFactory.defaultThreadFactory());
    }

    protected void tearDown() throws Exception {
        this.service.shutdownNow();
        super.tearDown();
    }

    public void test_blockingBuffer() throws InterruptedException, ExecutionException, TimeoutException {
        final Object obj = new Object();
        final Object obj2 = new Object();
        final Object obj3 = new Object();
        final BlockingBuffer blockingBuffer = new BlockingBuffer(3);
        assertTrue(blockingBuffer.isOpen());
        assertTrue(blockingBuffer.isEmpty());
        assertEquals("chunkCount", 0L, blockingBuffer.getChunksAddedCount());
        assertEquals("elementCount", 0L, blockingBuffer.getElementsAddedCount());
        final IAsynchronousIterator it = blockingBuffer.iterator();
        assertFalse(it.hasNext(1L, TimeUnit.NANOSECONDS));
        assertNull(it.next(1L, TimeUnit.NANOSECONDS));
        blockingBuffer.add(obj);
        assertTrue(blockingBuffer.isOpen());
        assertFalse(blockingBuffer.isEmpty());
        assertEquals("chunkCount", 0L, blockingBuffer.getChunksAddedCount());
        assertEquals("elementCount", 1L, blockingBuffer.getElementsAddedCount());
        assertTrue(it.hasNext(1L, TimeUnit.NANOSECONDS));
        assertTrue(it.hasNext());
        blockingBuffer.add(obj2);
        assertTrue(blockingBuffer.isOpen());
        assertFalse(blockingBuffer.isEmpty());
        assertEquals("chunkCount", 0L, blockingBuffer.getChunksAddedCount());
        assertEquals("elementCount", 2L, blockingBuffer.getElementsAddedCount());
        Future<?> submit = this.service.submit(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.1
            @Override // java.lang.Runnable
            public void run() {
                blockingBuffer.add(obj3);
                blockingBuffer.close();
            }
        });
        Future<?> submit2 = this.service.submit(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.2
            @Override // java.lang.Runnable
            public void run() {
                TestCase.assertEquals(obj, it.next());
                TestCase.assertEquals(obj2, it.next());
                TestCase.assertEquals(obj3, it.next());
                try {
                    TestCase.assertFalse(it.hasNext(1L, TimeUnit.NANOSECONDS));
                    TestCase.assertNull(it.next(1L, TimeUnit.NANOSECONDS));
                    TestCase.assertFalse(it.hasNext());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        submit.get(100L, TimeUnit.MILLISECONDS);
        submit2.get(100L, TimeUnit.MILLISECONDS);
    }

    public void test_blockingBuffer_close_noConsumer() throws Exception {
        final Object obj = new Object();
        final BlockingBuffer blockingBuffer = new BlockingBuffer(2);
        final AtomicReference atomicReference = new AtomicReference();
        Runnable runnable = new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.1Producer
            @Override // java.lang.Runnable
            public void run() {
                try {
                    blockingBuffer.add(obj);
                } catch (Throwable th) {
                    if (atomicReference.compareAndSet(null, th)) {
                        TestBlockingBuffer.log.warn("First producer cause: " + th);
                    }
                    throw new RuntimeException(th);
                }
            }
        };
        runnable.run();
        runnable.run();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(DaemonThreadFactory.defaultThreadFactory());
        try {
            Future<?> submit = newSingleThreadExecutor.submit(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.1Producer
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        blockingBuffer.add(obj);
                    } catch (Throwable th) {
                        if (atomicReference.compareAndSet(null, th)) {
                            TestBlockingBuffer.log.warn("First producer cause: " + th);
                        }
                        throw new RuntimeException(th);
                    }
                }
            });
            Thread.sleep(200L);
            assertFalse(submit.isDone());
            blockingBuffer.close();
            try {
                submit.get(1L, TimeUnit.SECONDS);
            } catch (ExecutionException e) {
                if (!InnerCause.isInnerCause(e, BufferClosedException.class)) {
                    fail("Unexpected cause: " + e, e);
                }
            }
        } finally {
            newSingleThreadExecutor.shutdownNow();
            newSingleThreadExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
    }

    public void test_blockingBuffer_close_noConsumer_closedThroughIterator() throws Exception {
        final Object obj = new Object();
        final BlockingBuffer blockingBuffer = new BlockingBuffer(2);
        final AtomicReference atomicReference = new AtomicReference();
        Runnable runnable = new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.2Producer
            @Override // java.lang.Runnable
            public void run() {
                try {
                    blockingBuffer.add(obj);
                } catch (Throwable th) {
                    if (atomicReference.compareAndSet(null, th)) {
                        TestBlockingBuffer.log.warn("First producer cause: " + th);
                    }
                    throw new RuntimeException(th);
                }
            }
        };
        runnable.run();
        runnable.run();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(DaemonThreadFactory.defaultThreadFactory());
        try {
            FutureTask futureTask = new FutureTask(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.2Producer
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        blockingBuffer.add(obj);
                    } catch (Throwable th) {
                        if (atomicReference.compareAndSet(null, th)) {
                            TestBlockingBuffer.log.warn("First producer cause: " + th);
                        }
                        throw new RuntimeException(th);
                    }
                }
            }, (Void) null);
            blockingBuffer.setFuture(futureTask);
            newSingleThreadExecutor.submit(futureTask);
            Thread.sleep(200L);
            assertFalse(futureTask.isDone());
            blockingBuffer.iterator().close();
            try {
                futureTask.get(1L, TimeUnit.SECONDS);
            } catch (CancellationException e) {
                if (log.isInfoEnabled()) {
                    log.info("Ignoring expected exception: " + e);
                }
            }
        } finally {
            newSingleThreadExecutor.shutdownNow();
            newSingleThreadExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        }
    }

    public void test_blockingBuffer_close() throws InterruptedException, ExecutionException, TimeoutException {
        assertTrue(true);
        final Object obj = new Object();
        final BlockingBuffer blockingBuffer = new BlockingBuffer(1000);
        final IAsynchronousIterator it = blockingBuffer.iterator();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10, DaemonThreadFactory.defaultThreadFactory());
        ScheduledFuture<?> scheduleAtFixedRate = newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.3Producer
            @Override // java.lang.Runnable
            public void run() {
                try {
                    blockingBuffer.add(obj);
                } catch (Throwable th) {
                    if (atomicReference.compareAndSet(null, th)) {
                        TestBlockingBuffer.log.warn("First producer cause: " + th);
                    }
                    throw new RuntimeException(th);
                }
            }
        }, 0L, 1L, TimeUnit.MILLISECONDS);
        ScheduledFuture<?> scheduleAtFixedRate2 = newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { // from class: com.bigdata.relation.accesspath.TestBlockingBuffer.1Consumer
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (it.hasNext()) {
                        it.next();
                    }
                } catch (Throwable th) {
                    if (atomicReference2.compareAndSet(null, th)) {
                        TestBlockingBuffer.log.warn("First consumer cause: " + th);
                    }
                    throw new RuntimeException(th);
                }
            }
        }, 0L, 2L, TimeUnit.MILLISECONDS);
        while (blockingBuffer.size() != 1000) {
            try {
                try {
                    Thread.sleep(1L);
                } catch (Throwable th) {
                    scheduleAtFixedRate2.cancel(true);
                    scheduleAtFixedRate.cancel(true);
                    fail("Test failure: rootCause=" + th, th);
                    newScheduledThreadPool.shutdownNow();
                    if (scheduleAtFixedRate2.isDone() && scheduleAtFixedRate.isDone()) {
                        return;
                    }
                    newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS);
                    return;
                }
            } catch (Throwable th2) {
                newScheduledThreadPool.shutdownNow();
                if (!scheduleAtFixedRate2.isDone() || !scheduleAtFixedRate.isDone()) {
                    newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS);
                }
                throw th2;
            }
        }
        log.warn("Buffer is full: " + blockingBuffer.toString());
        blockingBuffer.close();
        while (!blockingBuffer.isEmpty()) {
            assertFalse(scheduleAtFixedRate2.isDone());
            Thread.sleep(2L);
        }
        log.info("Buffer has been drained.");
        try {
            scheduleAtFixedRate.get(50L, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!InnerCause.isInnerCause(e, BufferClosedException.class)) {
                throw e;
            }
            newScheduledThreadPool.shutdownNow();
            if (scheduleAtFixedRate2.isDone() && scheduleAtFixedRate.isDone()) {
                return;
            }
            newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS);
            return;
        } catch (TimeoutException e2) {
            fail("Producer is still running (blocked): " + e2, e2);
        }
        assertFalse(scheduleAtFixedRate2.isDone());
        newScheduledThreadPool.shutdownNow();
        if (scheduleAtFixedRate2.isDone() && scheduleAtFixedRate.isDone()) {
            return;
        }
        newScheduledThreadPool.awaitTermination(5L, TimeUnit.SECONDS);
    }

    public void _testStress_blockingBuffer_close() throws InterruptedException, ExecutionException, TimeoutException {
        for (int i = 0; i < 100; i++) {
            try {
                test_blockingBuffer_close();
            } catch (Throwable th) {
                fail("Failed @ i=" + i + " : " + th, th);
            }
        }
    }
}
