programing

take ()에서 차단하는 BlockingQueue를 중단하는 방법은 무엇입니까?

nasanasas 2020. 10. 8. 08:05
반응형

take ()에서 차단하는 BlockingQueue를 중단하는 방법은 무엇입니까?


연속 루프에서 BlockingQueue호출 take()하여 a 에서 객체를 가져와 처리 하는 클래스가 있습니다 . 어느 시점에서 더 이상 개체가 대기열에 추가되지 않는다는 것을 알고 있습니다. 방법을 중단하여 take()차단을 중지 하려면 어떻게합니까 ?

객체를 처리하는 클래스는 다음과 같습니다.

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

이 클래스를 사용하여 객체를 처리하는 방법은 다음과 같습니다.

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

스레드 중단이 옵션이 아닌 경우 다른 방법은 MyObjHandler에 의해 인식되고 루프를 벗어나는 큐에 "마커"또는 "명령"개체를 배치하는 것입니다.


BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

그러나 이렇게하면 처리 대기중인 항목이 대기열에있는 동안 스레드가 중단 될 수 있습니다. poll대신 사용 하는 것을 고려할 수 있습니다. 그러면 take처리 스레드가 시간 초과되고 새 입력없이 잠시 기다렸을 때 종료됩니다.


매우 늦었지만 비슷한 문제에 직면하고 위의 erickson이poll 제안한 접근 방식을 약간 변경하여 사용 했기 때문에 이것이 다른 사람들에게도 도움이 되기를 바랍니다 .

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

이것은 두 문제를 모두 해결했습니다.

  1. BlockingQueue더 이상 요소를 기다릴 필요가 없음을 알 수 있도록에 플래그 를 지정했습니다.
  2. 중간에 중단되지 않았으므로 대기열의 모든 항목이 처리되고 추가 할 항목이 남아 있지 않을 때만 처리 블록이 종료됩니다.

스레드 중단 :

thread.interrupt()

아니면 방해하지 마십시오.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. when producer puts object to the queue, call queue.notify(), if it ends, call queue.done()
  2. loop while (!queue.isDone() || !queue.isEmpty())
  3. test take() return value for null

참고URL : https://stackoverflow.com/questions/812342/how-to-interrupt-a-blockingqueue-which-is-blocking-on-take

반응형