반응형
take ()에서 차단하는 BlockingQueue를 중단하는 방법은 무엇입니까?
연속 루프에서 BlockingQueue
호출 take()
하여 a 에서 객체를 가져와 처리 하는 클래스가 있습니다 . 어느 시점에서 더 이상 개체가 대기열에 추가되지 않는다는 것을 알고 있습니다. 방법을 중단하여 take()
차단을 중지 하려면 어떻게합니까 ?
객체를 처리하는 클래스는 다음과 같습니다.
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
이 클래스를 사용하여 객체를 처리하는 방법은 다음과 같습니다.
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
스레드 중단이 옵션이 아닌 경우 다른 방법은 MyObjHandler에 의해 인식되고 루프를 벗어나는 큐에 "마커"또는 "명령"개체를 배치하는 것입니다.
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
그러나 이렇게하면 처리 대기중인 항목이 대기열에있는 동안 스레드가 중단 될 수 있습니다. poll
대신 사용 하는 것을 고려할 수 있습니다. 그러면 take
처리 스레드가 시간 초과되고 새 입력없이 잠시 기다렸을 때 종료됩니다.
매우 늦었지만 비슷한 문제에 직면하고 위의 erickson이poll
제안한 접근 방식을 약간 변경하여 사용 했기 때문에 이것이 다른 사람들에게도 도움이 되기를 바랍니다 .
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
이것은 두 문제를 모두 해결했습니다.
BlockingQueue
더 이상 요소를 기다릴 필요가 없음을 알 수 있도록에 플래그 를 지정했습니다.- 중간에 중단되지 않았으므로 대기열의 모든 항목이 처리되고 추가 할 항목이 남아 있지 않을 때만 처리 블록이 종료됩니다.
스레드 중단 :
thread.interrupt()
아니면 방해하지 마십시오.
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- when producer puts object to the queue, call
queue.notify()
, if it ends, callqueue.done()
- loop while (!queue.isDone() || !queue.isEmpty())
- test take() return value for null
반응형
'programing' 카테고리의 다른 글
sed 오류 : "`s '명령의 RHS에 대한 잘못된 참조 \ 1" (0) | 2020.10.08 |
---|---|
Android 빌드 도구 25.1.6 GCM / FCM으로 업데이트 한 후 IncompatibleClassChangeError (0) | 2020.10.08 |
Android Studio-단일 창에서 여러 프로젝트를 여는 방법은 무엇입니까? (0) | 2020.10.08 |
Golang 미사용 가져 오기 오류를 비활성화하는 방법 (0) | 2020.10.08 |
.NET 애플리케이션 도메인이란 무엇입니까? (0) | 2020.10.08 |