Java : 특정 큐 크기 이후 제출시 차단되는 ExecutorService
단일 스레드가 병렬로 수행 할 수있는 I / O 집약적 인 작업을 생성하는 솔루션을 코딩하려고합니다. 각 작업에는 중요한 인 메모리 데이터가 있습니다. 따라서 현재 보류중인 작업의 수를 제한 할 수 있기를 원합니다.
다음과 같이 ThreadPoolExecutor를 생성하면 :
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
그런 다음이 executor.submit(callable)발생 RejectedExecutionException큐가 가득하고 모든 스레드가 이미 바쁜 때.
executor.submit(callable)대기열이 가득 차고 모든 스레드가 사용 중일 때 차단 하려면 어떻게해야 합니까?
편집 : 나는 노력 이 :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
그리고 그것은 내가 원하는 효과를 다소 달성하지만 우아하지 않은 방식으로 달성합니다 (기본적으로 거부 된 스레드는 호출 스레드에서 실행되므로 호출 스레드가 더 많이 제출하지 못하도록 차단합니다).
편집 : (질문을 한 후 5 년)
이 질문과 그 답변을 읽는 사람에게 허용 된 답변을 하나의 올바른 해결책으로 받아들이지 마십시오. 모든 답변과 의견을 읽으십시오.
나는이 같은 일을했습니다. 트릭은 offer () 메서드가 실제로 put () 인 BlockingQueue를 만드는 것입니다. (원하는 기본 BlockingQueue impl을 사용할 수 있습니다).
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
이것은 스레드 풀에서만 작동 corePoolSize==maxPoolSize하므로 조심하십시오 (주석 참조).
이 문제를 해결 한 방법은 다음과 같습니다.
(참고 :이 솔루션은 Callable을 제출하는 스레드를 차단하므로 RejectedExecutionException이 throw되는 것을 방지합니다.)
public class BoundedExecutor extends ThreadPoolExecutor{
private final Semaphore semaphore;
public BoundedExecutor(int bound) {
super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
semaphore = new Semaphore(bound);
}
/**Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{
semaphore.acquire();
return submit(task);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
현재 허용되는 답변에는 잠재적으로 심각한 문제가 있습니다.이 경우 ThreadPoolExecutor.execute의 동작이 변경되어 corePoolSize < maxPoolSize, ThreadPoolExecutor 논리가 코어를 넘어서는 작업자를 추가하지 않습니다.
에서 가능한 ThreadPoolExecutor의 .Execute를 (Runnable를)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
특히, 마지막 'else'블록은 절대 적중되지 않습니다.
더 나은 대안은 OP가 이미 수행하고있는 것과 유사한 작업 을 수행하는 것입니다. 동일한 논리 를 수행하려면 RejectedExecutionHandler 를 사용하십시오 put.
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
if (!executor.isShutdown()) {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
}
}
주석에서 지적했듯이이 접근 방식에는 몇 가지주의해야 할 사항이 있습니다 ( 이 답변 참조 ).
- 이면
corePoolSize==0작업이 표시되기 전에 풀의 모든 스레드가 죽을 수있는 경쟁 조건이 있습니다. - 대기열 작업을 래핑하는 구현 (에 적용되지 않음
ThreadPoolExecutor)을 사용하면 핸들러가 동일한 방식으로 래핑하지 않는 한 문제가 발생합니다.
이러한 문제를 염두에두고이 솔루션은 대부분의 일반적인 ThreadPoolExecutors에서 작동하며 corePoolSize < maxPoolSize.
비슷한 문제가 있었고 다음 beforeExecute/afterExecute에서 후크를 사용하여 구현했습니다 ThreadPoolExecutor.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Blocks current task execution if there is not enough resources for it.
* Maximum task count usage controlled by maxTaskCount property.
*/
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock taskLock = new ReentrantLock();
private final Condition unpaused = taskLock.newCondition();
private final int maxTaskCount;
private volatile int currentTaskCount;
public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, int maxTaskCount) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.maxTaskCount = maxTaskCount;
}
/**
* Executes task if there is enough system resources for it. Otherwise
* waits.
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
taskLock.lock();
try {
// Spin while we will not have enough capacity for this job
while (maxTaskCount < currentTaskCount) {
try {
unpaused.await();
} catch (InterruptedException e) {
t.interrupt();
}
}
currentTaskCount++;
} finally {
taskLock.unlock();
}
}
/**
* Signalling that one more task is welcome
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
taskLock.lock();
try {
currentTaskCount--;
unpaused.signalAll();
} finally {
taskLock.unlock();
}
}
}
이 정도면 충분합니다. Btw, 원래 구현은 하나의 작업이 다른 작업보다 100 배 더 클 수 있고 두 개의 거대한 작업을 제출하는 것이 상자를 죽이는 것이었기 때문에 작업 크기 기반이었습니다. 그러나 하나의 크고 작은 작업을 실행하는 것은 괜찮 았습니다. I / O 집약적 인 작업이 대략 같은 크기 인 경우이 클래스를 사용할 수 있습니다. 그렇지 않으면 알려 주시면 크기 기반 구현을 게시하겠습니다.
추신 : ThreadPoolExecutorjavadoc 을 확인하고 싶습니다 . 쉽게 커스터마이징 할 수있는 방법에 대한 Doug Lea의 정말 멋진 사용자 가이드입니다.
나는 이것이 오래된 질문이라는 것을 알고 있지만 새로운 작업을 만드는 것이 매우 빠르며 기존 작업이 충분히 빨리 완료되지 않아 OutOfMemoryError가 너무 많이 발생하면 유사한 문제가 발생했습니다.
내 경우에는 Callables제출하고 난 모든 저장해야하므로 결과를 필요로하는 Futures의해 반환을 executor.submit(). 내 해결책 은를 최대 크기 Futures로 넣는 것 BlockingQueue입니다. 대기열이 가득 차면 일부가 완료 될 때까지 더 이상 작업이 생성되지 않습니다 (요소가 대기열에서 제거됨). 의사 코드에서 :
final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {
Thread taskGenerator = new Thread() {
@Override
public void run() {
while (reader.hasNext) {
Callable task = generateTask(reader.next());
Future future = executor.submit(task);
try {
// if queue is full blocks until a task
// is completed and hence no future tasks are submitted.
futures.put(compoundFuture);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
executor.shutdown();
}
}
taskGenerator.start();
// read from queue as long as task are being generated
// or while Queue has elements in it
while (taskGenerator.isAlive()
|| !futures.isEmpty()) {
Future compoundFuture = futures.take();
// do something
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
throw new MyException(ex);
} finally {
executor.shutdownNow();
}
ArrayBlockingQueueaa 대신 a를 사용하는 것만 큼 간단하다고 생각합니다 LinkedBlockingQueue.
날 무시해 ... 그건 완전히 틀렸어. 필요한 효과를 얻을 수 없는 ThreadPoolExecutor호출 .Queue#offerput
대신 해당 호출 ThreadPoolExecutor의 구현을 확장 하고 제공 할 수 있습니다 .execute(Runnable)putoffer
그것은 완전히 만족스러운 대답이 아닌 것 같습니다.
데코레이터 패턴을 따르고 세마포어를 사용하여 실행 된 작업 수를 제어하는 솔루션을 구현했습니다. 다음 Executor과 같이 사용할 수 있습니다 .
- 최대 진행중인 작업 지정
- 작업 실행 허가를 기다리는 최대 제한 시간을 지정합니다 (제한 시간이 지나고 허가를 획득하지 않은 경우 a
RejectedExecutionException가 발생 함).
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import javax.annotation.Nonnull;
public class BlockingOnFullQueueExecutorDecorator implements Executor {
private static final class PermitReleasingDecorator implements Runnable {
@Nonnull
private final Runnable delegate;
@Nonnull
private final Semaphore semaphore;
private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
this.delegate = task;
this.semaphore = semaphoreToRelease;
}
@Override
public void run() {
try {
this.delegate.run();
}
finally {
// however execution goes, release permit for next task
this.semaphore.release();
}
}
@Override
public final String toString() {
return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
}
}
@Nonnull
private final Semaphore taskLimit;
@Nonnull
private final Duration timeout;
@Nonnull
private final Executor delegate;
public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
if (maximumTaskNumber < 1) {
throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
}
this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
if (this.timeout.isNegative()) {
throw new IllegalArgumentException("'maximumTimeout' must not be negative");
}
this.taskLimit = new Semaphore(maximumTaskNumber);
}
@Override
public final void execute(final Runnable command) {
Objects.requireNonNull(command, "'command' must not be null");
try {
// attempt to acquire permit for task execution
if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
}
}
catch (final InterruptedException e) {
// restore interrupt status
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
}
@Override
public final String toString() {
return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
this.timeout, this.delegate);
}
}
'IT TIP' 카테고리의 다른 글
| .NET에 XML 직렬화 가능한 사전이없는 이유는 무엇입니까? (0) | 2020.10.24 |
|---|---|
| 사용자 정의 속성의 생성자는 언제 실행됩니까? (0) | 2020.10.24 |
| 어설 션이 실패 할 때 Python의 unittest에서 계속 (0) | 2020.10.24 |
| 마스터 이외의 브랜치에 대한 GitHub 기여자 그래프를 보는 방법은 무엇입니까? (0) | 2020.10.24 |
| iBeacons의 삼각 측량 예제 (0) | 2020.10.23 |