COSC 3P91 - Lab Test Example

In this lab test, you will implement a multi-stage pipeline using multiple threads. The pipeline will consist of:

  1. A Producer thread that generates random numbers.

  2. A Processor thread that doubles the numbers.

  3. A Consumer thread that prints only prime numbers.

  4. An Observer that logs updates whenever a prime number is printed.

You must ensure proper thread synchronization using synchronized, wait(), and notifyAll().


Instructions

1. Implement a Thread-Safe Queue

Create a generic class PipelineQueue< E > that acts as a thread-safe queue using the Guarded Suspension Pattern.

  • Implement the following methods:
public void push(E element);
public E pull();
public void close();
  • push(E element): Adds an element and notifies waiting threads.
  • pull(): Blocks if empty until an element is available.
  • close(): Marks queue as inactive. pull() should return null once the queue is empty.

2. Implement the Producer Thread

Create a class NumberProducer that:

  • Generates N random integers (where N is passed in the constructor).
  • Pushes them into the PipelineQueue< Integer >.
  • Closes the queue after producing N numbers.

Example:

PipelineQueue<Integer> queue = new PipelineQueue<>();
new NumberProducer(queue, 10).start();

3. Implement the Processor Thread

Create a class NumberProcessor that:

  • Reads numbers from the PipelineQueue< Integer >.
  • Doubles each number.
  • Pushes the result into another PipelineQueue< Integer >.
  • Closes the output queue after processing all numbers.

4. Implement the Consumer Thread

Create a class PrimeFilter that:

  • Reads numbers from the processed queue.
  • Prints the number if it’s prime.
  • Notifies observers when a prime number is found.

5. Implement an Observer

Create an Observer interface with:

public interface Observer {
    void update(int prime);
}
  • Implement a LoggerObserver class that writes every prime number to System.out.

6. Implement the Main Program

  • Create instances of NumberProducer, NumberProcessor, and PrimeFilter.
  • Start the threads.
  • Attach an observer (LoggerObserver) to the PrimeFilter.
  • Ensure that all threads terminate properly.

Solution

1. Thread-Safe Queue (PipelineQueue.java)

This queue uses Guarded Suspension Pattern for thread synchronization.

import java.util.LinkedList;
 
public class PipelineQueue<E> {
    private final LinkedList<E> queue = new LinkedList<>();
    private boolean active = true;
 
    public synchronized void push(E element) {
        if (!active) return;
        queue.add(element);
        notifyAll(); // Wake up any waiting threads
    }
 
    public synchronized E pull() {
        while (queue.isEmpty() && active) {
            try {
                wait(); // Wait if the queue is empty
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return queue.isEmpty() ? null : queue.removeFirst();
    }
 
    public synchronized void close() {
        active = false;
        notifyAll(); // Wake up any remaining waiting threads
    }
}

2. Producer Thread (NumberProducer.java)

Generates random numbers and pushes them into the queue.

import java.util.Random;
 
public class NumberProducer extends Thread {
    private final PipelineQueue<Integer> queue;
    private final int count;
    private final Random random = new Random();
 
    public NumberProducer(PipelineQueue<Integer> queue, int count) {
        this.queue = queue;
        this.count = count;
    }
 
    @Override
    public void run() {
        for (int i = 0; i < count; i++) {
            int num = random.nextInt(50) + 1; // Random number from 1 to 50
            System.out.println("Producer generated: " + num);
            queue.push(num);
        }
        queue.close(); // Close queue after producing numbers
    }
}

3. Processor Thread (NumberProcessor.java)

Doubles numbers and passes them to the next queue.

public class NumberProcessor extends Thread {
    private final PipelineQueue<Integer> inputQueue;
    private final PipelineQueue<Integer> outputQueue;
 
    public NumberProcessor(PipelineQueue<Integer> inputQueue, PipelineQueue<Integer> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }
 
    @Override
    public void run() {
        Integer num;
        while ((num = inputQueue.pull()) != null) {
            int doubled = num * 2;
            System.out.println("Processor doubled: " + doubled);
            outputQueue.push(doubled);
        }
        outputQueue.close(); // Close the queue after processing
    }
}

4. Consumer Thread (PrimeFilter.java)

Checks if numbers are prime and notifies observers.

import java.util.ArrayList;
import java.util.List;
 
public class PrimeFilter extends Thread {
    private final PipelineQueue<Integer> queue;
    private final List<Observer> observers = new ArrayList<>();
 
    public PrimeFilter(PipelineQueue<Integer> queue) {
        this.queue = queue;
    }
 
    public void addObserver(Observer observer) {
        observers.add(observer);
    }
 
    @Override
    public void run() {
        Integer num;
        while ((num = queue.pull()) != null) {
            if (isPrime(num)) {
                System.out.println("Prime found: " + num);
                notifyObservers(num);
            }
        }
    }
 
    private boolean isPrime(int num) {
        if (num < 2) return false;
        for (int i = 2; i <= Math.sqrt(num); i++) {
            if (num % i == 0) return false;
        }
        return true;
    }
 
    private void notifyObservers(int prime) {
        for (Observer observer : observers) {
            observer.update(prime);
        }
    }
}

5. Observer Interface (Observer.java)

Observers get notified when a prime number is found.

public interface Observer {
    void update(int prime);
}

6. Observer Implementation (LoggerObserver.java)

Logs the detected prime numbers.

public class LoggerObserver implements Observer {
    @Override
    public void update(int prime) {
        System.out.println("Observer: " + prime + " was found as a prime!");
    }
}

7. Main Class (Main.java)

Runs the pipeline with all components.

public class Main {
    public static void main(String[] args) {
        PipelineQueue<Integer> queue1 = new PipelineQueue<>();
        PipelineQueue<Integer> queue2 = new PipelineQueue<>();
 
        NumberProducer producer = new NumberProducer(queue1, 10);
        NumberProcessor processor = new NumberProcessor(queue1, queue2);
        PrimeFilter consumer = new PrimeFilter(queue2);
 
        LoggerObserver logger = new LoggerObserver();
        consumer.addObserver(logger);
 
        producer.start();
        processor.start();
        consumer.start();
 
        try {
            producer.join();
            processor.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("All threads have finished.");
    }
}