Program Tip

ExecutorService를 통해 CompletionService를 언제 사용해야합니까?

programtip 2020. 10. 22. 22:21
반응형

ExecutorService를 통해 CompletionService를 언제 사용해야합니까?


방금 이 블로그 게시물 에서 CompletionService를 찾았 습니다 . 그러나 이것은 표준 ExecutorService에 비해 CompletionService의 장점을 실제로 보여주지는 않습니다. 둘 중 하나를 사용하여 동일한 코드를 작성할 수 있습니다. 그렇다면 CompletionService는 언제 유용할까요?

명확하게하기 위해 짧은 코드 샘플을 제공 할 수 있습니까? 예를 들어,이 코드 샘플은 CompletionService가 필요하지 않은 위치 (= ExecutorService와 동일) 만 보여줍니다.

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

를 사용하면 ExecutorService실행할 작업을 제출 한 후에는 완료된 작업 결과를 효율적으로 가져 오기 위해 수동으로 코딩해야합니다.

를 사용 CompletionService하면 이것은 거의 자동화됩니다. 하나의 작업 만 제출하기 때문에 제시 한 코드에서는 그 차이가 분명하지 않습니다. 그러나 제출할 작업 목록이 있다고 가정합니다. 아래 예에서는 여러 작업이 CompletionService에 제출됩니다. 그런 다음 (결과를 얻기 위해) 완료된 작업을 찾는 대신 CompletionService 인스턴스에 결과가 사용 가능 해지면 반환하도록 요청합니다.

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }

많은 세부 사항 생략 :

  • ExecutorService = 들어오는 큐 + 작업자 스레드
  • CompletionService = 수신 대기열 + 작업자 스레드 + 출력 대기열

나는 javadoc이 언제 CompletionService유용 ExecutorService하지 않은지 대한 질문에 가장 잘 대답한다고 생각합니다 .

완료된 작업의 결과 소비에서 새로운 비동기 작업의 생산을 분리하는 서비스입니다.

기본적으로이 인터페이스를 사용하면 프로그램이 해당 작업의 결과에 대한 다른 소비자에 대해 알지 못해도 작업을 만들고 제출하는 (그리고 해당 제출의 결과를 조사하는) 생산자를 가질 수 있습니다. 한편, 작업을 제출하는 생산자에 대해 알지 못하는 상태 에서 CompletionServicepoll또는 take결과 알고있는 소비자 .

기록을 위해, 다소 늦기 때문에 틀릴 수 있지만 그 블로그 게시물의 샘플 코드가 메모리 누수를 일으킨다는 것은 상당히 확신합니다. 적극적인 소비자가 ExecutorCompletionService의 내부 대기열에서 결과를 가져 오지 않으면 블로거가 해당 대기열이 어떻게 배출 될 것으로 예상했는지 확신 할 수 없습니다.


기본적으로 CompletionService여러 작업을 병렬로 실행 한 다음 완료 순서대로 작업 하려면 a를 사용합니다 . 그래서 제가 5 개의 작업을 실행하면 CompletionService에서 첫 번째 작업 이 완료됩니다. 작업이 하나 뿐인 예 ExecutorCallable.


우선, 프로세서 시간을 낭비하지 않으려면

while (!future.isDone()) {
        // Do some work...
}

우리는 사용해야합니다

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

이 코드의 나쁜 점은 종료된다는 것입니다 ExecutorService. 작업을 계속하려면 (예 : 반복 작업 생성이 있음) invokeAll 또는 ExecutorService.

invokeAll모든 작업이 완료 될 때까지 기다립니다. ExecutorService결과를 하나씩 가져 오거나 투표 할 수있는 능력을 부여합니다.

그리고 마지막으로 재귀적인 예 :

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}

런타임에 직접 확인하고 두 솔루션 (Executorservice 및 Completionservice)을 구현해보십시오. 그러면 두 솔루션이 어떻게 다른지 알 수 있으며 둘 중 하나를 사용할시기가 더 명확해질 것입니다. http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html 을 원한다면 여기에 예가 있습니다.


5 개의 장기 실행 작업 (호출 가능한 작업)이 있고 해당 작업을 실행자 서비스에 제출했다고 가정 해 보겠습니다. 이제 5 개 작업이 모두 경쟁 할 때까지 기다리지 않고 하나가 완료되면 이러한 작업에 대해 일종의 처리를하고 싶다고 상상해보십시오. 이제 향후 개체에 폴링 논리를 작성하거나이 API를 사용하여이 작업을 수행 할 수 있습니다.


completeservice를 사용하는 또 다른 이점이 있습니다. 성능

을 (를) 호출 future.get()하면 회전 대기 상태가됩니다.

...에서 java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

오래 실행되는 작업이 있으면 성능에 재앙이 될 것입니다.

완료 서비스를 사용하면 작업이 완료되면 결과가 대기열에 추가되고 낮은 성능으로 대기열을 폴링 할 수 있습니다.

완료 서비스는 done후크 가있는 랩 작업을 사용하여이를 달성합니다 .

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

작업 생산자가 결과에 관심이없고 executor 서비스에 의해 실행되는 비동기 작업의 결과를 처리하는 것이 다른 구성 요소의 책임이라면 CompletionService를 사용해야합니다. 작업 생성자와 작업 결과 프로세서를 분리하는 데 도움이됩니다. http://www.zoftino.com/java-concurrency-executors-framework-tutorial 예제 참조


package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

================================================ =====

package com.barcap.test.test00;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

================================================ =========

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

======================================================================

difference of logs for executor completion service: the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-9 the result should be 9 the current thread is pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-9 the result should be 9 teh result is 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3

the task complted for pool-1-thread-2 the result should be 2

the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-8 the result should be 8 the current thread is pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3 the task complted for pool-1-thread-2 the result should be 2 the task complted for pool-1-thread-1 the result should be 1 the result of future is 1

=======================================================

for executorservice the result will only be avialable after all tasks complted.

executor completionservice any result avilable make that return.

참고URL : https://stackoverflow.com/questions/4912228/when-should-i-use-a-completionservice-over-an-executorservice

반응형