P D F . B L A C K

Loading

Use case

I need to implement asynchronous rendering the PDF pages where the rendering task is executed according to a priority indicator. The use case is a panel where the user can see thumbnails of the PDF pages and what I want to happen is that visible thumbnails are rendered first. Whenever the users scrolls the panel or the zoom level changes making new thumbnails visible, I need to submit a rendering task at high priority for those thumbnails so that they will be rendered as quickly as possible without having to wait for not visible thumbnails to be rendered.

Tools

  • Java 16
  • ThreadPoolExecutor
  • PriorityBlockingQueue

Comparable Runnable

We will extend ThreadPoolExecutor passing a PriorityBlockingQueue as work queue where Runnable rendering tasks will be stored. The PriorityBlockingQueue is a thread safe queue where elements are ordered according to their natural order so, for this to work, we will need to create comparable Runnables and submit them to the executor. An alternative is to pass a Comparator to the queue constructor but you will still need the interface to add a priority indicator to the Runnables

First we need an interface to compare priorities (we’ll need this for Runnables and RunnableFutures):


interface WithPriority extends Comparable {

    int priority();

    @Override
    default int compareTo(WithPriority o) {
        requireNonNull(o, "Cannot compare to a null object");
        return Integer.compare(this.priority(), o.priority());
    }
}

And then we create a Runnable implementing the interface:


class RunnableWithPriority implements Runnable, WithPriority {

        private Runnable r;
        private int priority;

        public RunnableWithPriority(Runnable wrapped, int priority) {
            this.r = wrapped;
            this.priority = priority;
        }

        @Override
        public int priority() {
            return priority;
        }

        @Override
        public void run() {
            r.run();
        }
}

Submit and execute

There are a couple of ways to submit new tasks to a ThreadPoolExecutor, through the submit methods or the execute method. We need to make sure that each one of them adds a WithPriority to the work queue.
The ThreadPoolExecutor::submit comes in few overridden versions inherited from AbstractExecutorService which provides a newTaskFor hook for us to override and provide our own RunnableFuture implementation. The newly created RunnableFuture is then executed using the ThreadPoolExecutor::execute which contains the logic to either run the task or store it in the work queue so for this to work, we need something that implements RunnableFuture & WithPriority.

Comparable FutureTask

This is our RunnableFuture & WithPriority. I also added a couple of factory methods that we will use later.


final class PriorityTask extends FutureTask implements WithPriority {

    private int priority = 0;

    private PriorityTask(Callable callable, int priority) {
        super(callable);
        this.priority = priority;
    }

    private PriorityTask(Runnable runnable, V result, int priority) {
        super(runnable, result);
        this.priority = priority;
    }

    @Override
    public int priority() {
        return priority;
    }

    static  PriorityTask withPriority(Callable callable, int priority) {
        return new PriorityTask<>(callable, priority);
    }

    static  PriorityTask withPriority(Runnable runnable, V result, int priority) {
        return new PriorityTask<>(runnable, result, priority);
    }

    static  PriorityTask defaultPriority(Callable callable) {
        return withPriority(callable, 0);
    }

    static  PriorityTask defaultPriority(Runnable runnable, V result) {
        return withPriority(runnable, result, 0);
    }

PriorityExecutor

We now have all the pieces that we need to create our own ThreadPoolExecutor backed by a PriorityBlockingQueue and where we make sure that every task executed is a WithPriority. We have also overridden the afterExecute, you can see its javadoc for details.


public class PriorityExecutor extends ThreadPoolExecutor {

    private static final Logger LOG = LoggerFactory.getLogger(PriorityExecutor.class);

    private final AtomicInteger threadsCounter = new AtomicInteger(1);

    public PriorityExecutor(int threadPoolSize) {
        super(threadPoolSize, threadPoolSize, 0, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>());
    }

    @Override
    public ThreadFactory getThreadFactory() {
        return runnable -> {
            Thread t = new Thread(runnable);
            t.setDaemon(true);
            t.setName("rendering-thread-" + threadsCounter.getAndIncrement());
            return t;
        };
    }

    @Override
    protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        if (runnable instanceof WithPriority p) {
            return withPriority(runnable, value, p.priority());
        }
        return defaultPriority(runnable, value);
    }

    @Override
    protected  RunnableFuture newTaskFor(Callable callable) {
        if (callable instanceof WithPriority p) {
            return withPriority(callable, p.priority());
        }
        return defaultPriority(callable);
    }

    @Override
    public void execute(Runnable command) {
        requireNonNull(command);
        if (!(command instanceof WithPriority)) {
            super.execute(defaultPriority(command, null));
        } else {
            super.execute(command);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (isNull(t) && r instanceof Future future) {
            try {
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread()
                      .interrupt();
            }
        }
        if (nonNull(t)) {
            LOG.error("Unhandled exception", t);
        }
    }
}

Caveat

There is no mechanism to avoid low priority tasks to starve if you keep submitting high priority tasks but this will work fine in the use case described above

Test

Here are some unit test to verify that tasks are picked up based on priority:


import io.reactivex.rxjava3.subjects.PublishSubject;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.fail;

/**
 * @author Andrea Vacondio
 *
 */
public class PriorityExecutorTest {
    private PublishSubject values = PublishSubject.create();

    @Test
    @DisplayName("Execution order in case of ExecutorService::submit")
    public void testPrioritySubmit() {
        var victim = new PriorityExecutor(1);
        var testListener = values.test();
        victim.submit(withPriority("A", 10));
        victim.submit(withPriority("B", 10));
        victim.submit(withPriority("E", -7));
        victim.submit(withPriority("C", 5));
        victim.submit(withPriority("D", -10));
        testListener.awaitCount(5)
                    .assertValues("A", "D", "E", "C", "B");
    }

    @Test
    @DisplayName("Execution order in case of Executor::execute")
    public void testPriorityExecute() {
        var victim = new PriorityExecutor(1);
        var testListener = values.test();
        victim.execute(withPriority("A", 10));
        victim.execute(withPriority("B", 10));
        victim.execute(withPriority("C", 5));
        victim.execute(withPriority("D", -10));
        victim.execute(withPriority("E", -7));
        testListener.awaitCount(5)
                    .assertValues("A", "D", "E", "C", "B");
    }

    @Test
    @DisplayName("Execution order in case of Executor::execute mixed with ExecutorService::submit")
    public void testPriorityMixed() {
        var victim = new PriorityExecutor(1);
        var testListener = values.test();
        victim.execute(withPriority("A", 10));
        victim.execute(withPriority("B", 10));
        victim.submit(taskWithPriority("C", 5));
        victim.submit(withPriority("D", -10));
        victim.submit(taskWithPriority("E", -7));
        victim.execute(withPriority("F", -50));
        victim.execute(withPriority("G", 42));
        victim.submit(taskWithPriority("H", 7));
        victim.execute(withPriority("I", -15));
        victim.execute(withPriority("L", 15));
        testListener.awaitCount(10)
                    .assertValues("A", "F", "I", "D", "E", "C", "H", "B", "L", "G");
    }

    private Runnable withPriority(String val, int prio) {
        return WithPriority.withPriority(() -> {
            try {
                Thread.sleep(100);
                values.onNext(val);
            } catch (InterruptedException e) {
                fail(e);
            }

        }, prio);
    }

    private PriorityTask taskWithPriority(String val, int prio) {
        return PriorityTask.withPriority(() -> {
            try {
                Thread.sleep(100);
                values.onNext(val);
            } catch (InterruptedException e) {
                fail(e);
            }

        }, null, prio);
    }
}

Leave a Comment