COSC 3P91 - Lab Test Example
In this lab test, you will implement a multi-stage pipeline using multiple threads. The pipeline will consist of:
-
A Producer thread that generates random numbers.
-
A Processor thread that doubles the numbers.
-
A Consumer thread that prints only prime numbers.
-
An Observer that logs updates whenever a prime number is printed.
You must ensure proper thread synchronization using synchronized, wait(), and notifyAll().
Instructions
1. Implement a Thread-Safe Queue
Create a generic class PipelineQueue< E > that acts as a thread-safe queue using the Guarded Suspension Pattern.
- Implement the following methods:
public void push(E element);
public E pull();
public void close();
- push(E element): Adds an element and notifies waiting threads.
- pull(): Blocks if empty until an element is available.
- close(): Marks queue as inactive. pull() should return null once the queue is empty.
2. Implement the Producer Thread
Create a class NumberProducer that:
- Generates N random integers (where N is passed in the constructor).
- Pushes them into the PipelineQueue< Integer >.
- Closes the queue after producing N numbers.
Example:
PipelineQueue<Integer> queue = new PipelineQueue<>();
new NumberProducer(queue, 10).start();
3. Implement the Processor Thread
Create a class NumberProcessor that:
- Reads numbers from the PipelineQueue< Integer >.
- Doubles each number.
- Pushes the result into another PipelineQueue< Integer >.
- Closes the output queue after processing all numbers.
4. Implement the Consumer Thread
Create a class PrimeFilter that:
- Reads numbers from the processed queue.
- Prints the number if it’s prime.
- Notifies observers when a prime number is found.
5. Implement an Observer
Create an Observer interface with:
public interface Observer {
void update(int prime);
}
- Implement a LoggerObserver class that writes every prime number to System.out.
6. Implement the Main Program
- Create instances of NumberProducer, NumberProcessor, and PrimeFilter.
- Start the threads.
- Attach an observer (LoggerObserver) to the PrimeFilter.
- Ensure that all threads terminate properly.
Solution
1. Thread-Safe Queue (PipelineQueue.java)
This queue uses Guarded Suspension Pattern for thread synchronization.
import java.util.LinkedList;
public class PipelineQueue<E> {
private final LinkedList<E> queue = new LinkedList<>();
private boolean active = true;
public synchronized void push(E element) {
if (!active) return;
queue.add(element);
notifyAll(); // Wake up any waiting threads
}
public synchronized E pull() {
while (queue.isEmpty() && active) {
try {
wait(); // Wait if the queue is empty
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return queue.isEmpty() ? null : queue.removeFirst();
}
public synchronized void close() {
active = false;
notifyAll(); // Wake up any remaining waiting threads
}
}
2. Producer Thread (NumberProducer.java)
Generates random numbers and pushes them into the queue.
import java.util.Random;
public class NumberProducer extends Thread {
private final PipelineQueue<Integer> queue;
private final int count;
private final Random random = new Random();
public NumberProducer(PipelineQueue<Integer> queue, int count) {
this.queue = queue;
this.count = count;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
int num = random.nextInt(50) + 1; // Random number from 1 to 50
System.out.println("Producer generated: " + num);
queue.push(num);
}
queue.close(); // Close queue after producing numbers
}
}
3. Processor Thread (NumberProcessor.java)
Doubles numbers and passes them to the next queue.
public class NumberProcessor extends Thread {
private final PipelineQueue<Integer> inputQueue;
private final PipelineQueue<Integer> outputQueue;
public NumberProcessor(PipelineQueue<Integer> inputQueue, PipelineQueue<Integer> outputQueue) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
Integer num;
while ((num = inputQueue.pull()) != null) {
int doubled = num * 2;
System.out.println("Processor doubled: " + doubled);
outputQueue.push(doubled);
}
outputQueue.close(); // Close the queue after processing
}
}
4. Consumer Thread (PrimeFilter.java)
Checks if numbers are prime and notifies observers.
import java.util.ArrayList;
import java.util.List;
public class PrimeFilter extends Thread {
private final PipelineQueue<Integer> queue;
private final List<Observer> observers = new ArrayList<>();
public PrimeFilter(PipelineQueue<Integer> queue) {
this.queue = queue;
}
public void addObserver(Observer observer) {
observers.add(observer);
}
@Override
public void run() {
Integer num;
while ((num = queue.pull()) != null) {
if (isPrime(num)) {
System.out.println("Prime found: " + num);
notifyObservers(num);
}
}
}
private boolean isPrime(int num) {
if (num < 2) return false;
for (int i = 2; i <= Math.sqrt(num); i++) {
if (num % i == 0) return false;
}
return true;
}
private void notifyObservers(int prime) {
for (Observer observer : observers) {
observer.update(prime);
}
}
}
5. Observer Interface (Observer.java)
Observers get notified when a prime number is found.
public interface Observer {
void update(int prime);
}
6. Observer Implementation (LoggerObserver.java)
Logs the detected prime numbers.
public class LoggerObserver implements Observer {
@Override
public void update(int prime) {
System.out.println("Observer: " + prime + " was found as a prime!");
}
}
7. Main Class (Main.java)
Runs the pipeline with all components.
public class Main {
public static void main(String[] args) {
PipelineQueue<Integer> queue1 = new PipelineQueue<>();
PipelineQueue<Integer> queue2 = new PipelineQueue<>();
NumberProducer producer = new NumberProducer(queue1, 10);
NumberProcessor processor = new NumberProcessor(queue1, queue2);
PrimeFilter consumer = new PrimeFilter(queue2);
LoggerObserver logger = new LoggerObserver();
consumer.addObserver(logger);
producer.start();
processor.start();
consumer.start();
try {
producer.join();
processor.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All threads have finished.");
}
}