i found example so. now, trying understand usage of wait() , notify()/notifyall(). in scenario , why need this.
class blockingqueue<t> { private queue<t> queue = new linkedlist<t>(); private int capacity; public blockingqueue(int capacity) { this.capacity = capacity; } public synchronized void put(t element) throws interruptedexception { while (queue.size() == capacity) { system.out.println("waiting..."); wait(); } queue.add(element); notify(); // notifyall() multiple producer/consumer threads } public synchronized t take() throws interruptedexception { while (queue.isempty()) { wait(); } t item = queue.remove(); notify(); // notifyall() multiple producer/consumer threads return item; } } so, implemented runnable , overriden run() method below
@override public void run() { // synchronized (this) { blockingqueue<integer> s = new blockingqueue(10); (int = 0; < 12; i++) { try { s.put(i); if (i > 9) { system.out.println(thread.currentthread().getname() + " : " + s.take()); } system.out.println(thread.currentthread().getname() + " extendsthread : counter : " + i); } //} //notify(); catch (interruptedexception ex) { logger.getlogger(extendsthread.class.getname()).log(level.severe, null, ex); } } } and, running thread below
implementsrunnable rc = new implementsrunnable(); thread t1 = new thread(rc, "a"); t1.start(); when i'm running it, stuck after counter : 9 , keep on waiting forever. suggest me what's wrong here ?
your concept flawed. blockingqueue can act bridge in producer/consumer pattern.
that is, allows 1 thread write content , thread read content it, it's doing in such away if:
- there no items taken, waits until new items arrive
- if there many items, waits items removed
in case, wait , notify internal messaging instance of blockingqueue
you can have @ intrinsic locks , synchronization.
so, instead of using 1 thread, should using (at least) two, produce , consumer...
producer
this takes instance of blockingqueue , adds int values it. each time stops 1 second before adding next
public class producer implements runnable { private blockingqueue<integer> queue; public producer(blockingqueue<integer> queue) { this.queue = queue; } @override public void run() { (int index = 0; index < 10; index++) { try { system.out.println("put " + index); queue.put(index); thread.sleep(1000); } catch (interruptedexception ex) { } } } } consumer
the consumer takes blockqueue , reads int values it, blocked until value exists.
public class consumer implements runnable { private blockingqueue<integer> queue; public consumer(blockingqueue<integer> queue) { this.queue = queue; } @override public void run() { try { while (true) { integer value = queue.take(); system.out.println("took " + value); } } catch (interruptedexception ex) { logger.getlogger(javaapplication220.class.getname()).log(level.severe, null, ex); } } } you can these start using like...
blockingqueue bq = new blockingqueue(10); thread p = new thread(new producer(bq)); thread c = new thread(new consumer(bq)); c.setdaemon(true); c.start(); p.start(); you should note there small delay between put messages, no delay between took messages. queue in action. consumer blocking/waiting on queue have give it.
you can play around producer , consumer, maybe changing times (having longer delay in consumer before taking item example) see how might cause different effects
when i'm running it, stuck after counter : 9 , keep on waiting forever
this because you've exceeded capacity of queue , it's put method blocking until take (you have dead lock, queue waiting take it, can't because you're locked on put)
things remember:
- for 2 or more threads work monitor locks, must share same instance of monitor/object lock. in case, same instance of
blockingqueue notifywake 1 object waiting on same instance of monitor lock'swaitmethod. there no way know one. can useful if have multiple consumers, don't care order in data processed, example
updated additional example
so, takes thread.sleep out of producer (and allows producer produce 100 values) , adds thread.sleep consumer.
this way, producer reach it's capacity before consumer can drain it, forcing wait until consumer can take values it...
public class producer implements runnable { private blockingqueue<integer> queue; public producer(blockingqueue<integer> queue) { this.queue = queue; } @override public void run() { (int index = 0; index < 100; index++) { try { system.out.println("put " + index); queue.put(index); } catch (interruptedexception ex) { } } } } public class consumer implements runnable { private blockingqueue<integer> queue; public consumer(blockingqueue<integer> queue) { this.queue = queue; } @override public void run() { try { while (true) { integer value = queue.take(); system.out.println("took " + value); thread.sleep(1000); } } catch (interruptedexception ex) { } } }
Comments
Post a Comment