Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nylon-threads/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ dependencies {
implementation 'org.jboss.threads:jboss-threads'

testImplementation 'com.google.guava:guava'
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'com.palantir.safe-logging:preconditions-assertj'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'org.junit.jupiter:junit-jupiter'

jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess'
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.palantir.nylon.threads;

import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import java.util.Objects;
import java.util.OptionalInt;
Expand Down Expand Up @@ -54,6 +55,13 @@ public interface MaxThreadsStage extends BuildStage {
* Executors factory.
*/
QueueSizeStage maxThreads(int maxThreads);

/**
* Configures the maximum threads allowed for concurrent execution through this executor facade. If no value is
* specified, the default is {@link Integer#MAX_VALUE}, matching the standard-library
* Executors factory.
*/
QueueSizeStage maxThreads(OptionalInt maxThreads);
}

public interface QueueSizeStage extends BuildStage {
Expand Down Expand Up @@ -103,11 +111,17 @@ public MaxThreadsStage executor(Executor value) {

@Override
public QueueSizeStage maxThreads(int value) {
return maxThreads(OptionalInt.of(value));
}

@Override
public QueueSizeStage maxThreads(OptionalInt value) {
Preconditions.checkState(maxThreads.isEmpty(), "maxThreads has already been configured");
if (value <= 0) {
throw new SafeIllegalArgumentException("maxThreads must be positive");
if (value.isPresent() && value.getAsInt() <= 0) {
throw new SafeIllegalArgumentException(
"maxThreads must be positive", SafeArg.of("maxThreads", value.getAsInt()));
}
maxThreads = OptionalInt.of(value);
maxThreads = value;
Comment on lines +114 to +124

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does either of these actually limit the maximum number of threads created for this executor to concurrently execute tasks?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample failing test:

    @ParameterizedTest
    @ValueSource(ints = {1, 2, 3, 4, 5})
    void testMaxThreadsLimitsTotalThreads(int maxThreads) {
        AtomicInteger threadsCreated = new AtomicInteger();
        ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("test-%d")
                .setThreadFactory(r -> {
                    int id = threadsCreated.incrementAndGet();
                    Thread thread = Executors.defaultThreadFactory().newThread(r);
                    thread.setName("thread-" + id);
                    return thread;
                })
                .build());
        try {
            CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
            int tasks = 3 * maxThreads;
            List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
            List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
                    .name("foo")
                    .executor(delegate)
                    .maxThreads(maxThreads)
                    .build());

            for (int i = 0; i < tasks; i++) {
                futures.add(executor.submit(() -> {
                    countDownLatch.countDown();
                    countDownLatch.await();
                    Thread thread = Thread.currentThread();
                    threadIds.add(thread.getId());
                    return thread.getName();
                }));
            }

            assertThat(Futures.successfulAsList(futures))
                    .succeedsWithin(Duration.ofSeconds(10))
                    .asInstanceOf(InstanceOfAssertFactories.list(String.class))
                    .hasSize(tasks)
                    .allSatisfy(value -> assertThat(value).startsWith("foo-"));

            assertThat(threadsCreated.get())
                    .as("should create at most %s threads", maxThreads)
                    .isLessThanOrEqualTo(maxThreads);
            assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);

            assertThat(delegate)
                    .asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
                    .satisfies(threadPoolExecutor -> {
                        assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
                    });
        } finally {
            assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
                    .as("Delegate failed to stop")
                    .isTrue();
        }
    }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that test counts how many threads are created (identified by Thread.id()), but I think Nylon and the backing org.jboss.threads.EnhancedViewExecutor are capping how many are simultaneously running.

This test passes:

    @ParameterizedTest
    @ValueSource(ints = {1, 2, 3, 4, 5})
    void testMaxThreadsLimitsTotalThreads(int maxThreads) {
        AtomicInteger threadsCreated = new AtomicInteger();
        ExecutorService delegate = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("test-%d")
                .setThreadFactory(r -> {
                    int id = threadsCreated.incrementAndGet();
                    Thread thread = Executors.defaultThreadFactory().newThread(r);
                    thread.setName("thread-" + id);
                    return thread;
                })
                .build());
        try {
            CountDownLatch countDownLatch = new CountDownLatch(maxThreads);
            int tasks = 3 * maxThreads;
            List<ListenableFuture<String>> futures = new ArrayList<>(tasks);
            List<Long> threadIds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            List<Instant> threadStarts = Collections.synchronizedList(new ArrayList<>(maxThreads));
            List<Instant> threadEnds = Collections.synchronizedList(new ArrayList<>(maxThreads));
            ListeningExecutorService executor = MoreExecutors.listeningDecorator(NylonExecutor.builder()
                    .name("foo")
                    .executor(delegate)
                    .maxThreads(maxThreads)
                    .build());
            for (int i = 0; i < tasks; i++) {
                futures.add(executor.submit(() -> {
                    threadStarts.add(Instant.now());
                    countDownLatch.countDown();
                    countDownLatch.await();
                    Thread thread = Thread.currentThread();
                    threadIds.add(thread.getId());
                    threadEnds.add(Instant.now());
                    return thread.getName();
                }));
            }
            assertThat(Futures.successfulAsList(futures))
                    .succeedsWithin(Duration.ofSeconds(10))
                    .asInstanceOf(InstanceOfAssertFactories.list(String.class))
                    .hasSize(tasks)
                    .allSatisfy(value -> assertThat(value).startsWith("foo-"));
            ThreadPeak threadPeak = peakThreads(threadStarts, threadEnds);
            assertThat(threadPeak.peak)
                    .as(
                            "should have at most %s threads running at once. Starts: %s. Ends: %s. List: %s",
                            maxThreads, threadStarts, threadEnds, threadPeak.description)
                    .isLessThanOrEqualTo(maxThreads);
            // assertThat(threadsCreated.get())
            //   .as("should create at most %s threads. Thread ids: %s", maxThreads, threadIds)
            //   .isLessThanOrEqualTo(maxThreads);
            // assertThat(Set.copyOf(threadIds)).hasSizeLessThanOrEqualTo(maxThreads);
            assertThat(delegate)
                    .asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
                    .satisfies(threadPoolExecutor -> {
                        // assertThat(threadPoolExecutor.getCorePoolSize()).isLessThanOrEqualTo(maxThreads);
                        // assertThat(threadPoolExecutor.getPoolSize()).isLessThanOrEqualTo(maxThreads);
                        // assertThat(threadPoolExecutor.getMaximumPoolSize()).isLessThanOrEqualTo(maxThreads);
                        assertThat(threadPoolExecutor.getCompletedTaskCount()).isEqualTo(tasks);
                    });
        } finally {
            assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
                    .as("Delegate failed to stop")
                    .isTrue();
        }
    }

    record ThreadPeak(int peak, String description) {}

    /**
     * The maximum number of threads that were running at once. A thread is counted as running if it has a start
     * instant but not yet an end instant.
     */
    private static ThreadPeak peakThreads(List<Instant> starts, List<Instant> ends) {
        // direction is +1 for start or -1 for ends
        record Tick(Instant instant, int direction) {}

        List<Tick> ticks = Streams.concat(
                        starts.stream().map(instant -> new Tick(instant, 1)),
                        ends.stream().map(instant -> new Tick(instant, -1)))
                .sorted(Comparator.comparing(Tick::instant))
                .toList();

        int running = 0;
        int max = 0;
        for (Tick tick : ticks) {
            running += tick.direction;
            max = Math.max(max, running);
        }
        return new ThreadPeak(max, ticks.toString());
    }

So for trying to cap memory usage from created thread stacks, maxThreads is not helpful

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package com.palantir.nylon.threads;

import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.logsafe.SafeArg;
import java.time.Duration;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -224,4 +228,44 @@ void testMaxQueueSize() throws InterruptedException {
.isTrue();
}
}

@Test
void testMaxThreads() {
ExecutorService delegate = Executors.newCachedThreadPool();
try {
assertThatCode(() -> NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(OptionalInt.of(1))
.build())
.doesNotThrowAnyException();

assertThatCode(() -> NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(OptionalInt.empty())
.build())
.doesNotThrowAnyException();

assertThatLoggableExceptionThrownBy(() -> NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(-1)
.build())
.hasMessageContaining("maxThreads must be positive")
.hasExactlyArgs(SafeArg.of("maxThreads", -1));

assertThatLoggableExceptionThrownBy(() -> NylonExecutor.builder()
.name("foo")
.executor(delegate)
.maxThreads(OptionalInt.of(-1))
.build())
.hasMessageContaining("maxThreads must be positive")
.hasExactlyArgs(SafeArg.of("maxThreads", -1));
} finally {
assertThat(MoreExecutors.shutdownAndAwaitTermination(delegate, Duration.ofSeconds(1)))
.as("Delegate failed to stop")
.isTrue();
}
}
}
8 changes: 5 additions & 3 deletions versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ com.palantir.safe-logging:logger-slf4j:3.9.0 (1 constraints: 070e6e42)

com.palantir.safe-logging:logger-spi:3.9.0 (2 constraints: 1d1e0c7c)

com.palantir.safe-logging:preconditions:3.9.0 (1 constraints: 0e051536)
com.palantir.safe-logging:preconditions:3.9.0 (2 constraints: 3e191db6)

com.palantir.safe-logging:safe-logging:3.9.0 (4 constraints: 9a33d562)
com.palantir.safe-logging:safe-logging:3.9.0 (5 constraints: ca472212)

io.smallrye.common:smallrye-common-annotation:2.12.0 (1 constraints: f00d0344)

Expand Down Expand Up @@ -50,6 +50,8 @@ com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1 c

com.google.j2objc:j2objc-annotations:3.0.0 (1 constraints: 150aeab4)

com.palantir.safe-logging:preconditions-assertj:3.9.0 (1 constraints: 0e051536)

net.bytebuddy:byte-buddy:1.15.11 (1 constraints: 7f0bc9ea)

net.sf.jopt-simple:jopt-simple:5.0.4 (1 constraints: be0ad6cc)
Expand All @@ -58,7 +60,7 @@ org.apache.commons:commons-math3:3.6.1 (1 constraints: bf0adbcc)

org.apiguardian:apiguardian-api:1.1.2 (6 constraints: 5366ce6e)

org.assertj:assertj-core:3.27.4 (1 constraints: 4205523b)
org.assertj:assertj-core:3.27.4 (2 constraints: a5195ae0)

org.awaitility:awaitility:4.3.0 (1 constraints: 09050836)

Expand Down