Share this page 

Pipe the output of a thread to the input of other threadsTag(s): Thread


The idea is to make the READ operation ATOMIC. Once the READING is started for one thread, it cannot be interrupted by another one.
   Pipeline Thread
  
      +----------+         +----------+
      | thread A |  ---- > | thread B |
      +----------+    |    +----------+
       (produce)      |     (consume)
                      | 
                      |
                      |    +----------+
                      +- > | thread C |
                           +----------+ 
(CONSUME)

The ProduceData.class and ConsumeData.class are the same as the previous JAVA How-to.

[AtomicInputStream.java]

import java.io.*;

public class AtomicInputStream extends PipedInputStream {
  int atom = 0;

  AtomicInputStream(int atom) {
    super();
    this.atom = atom;
    }
  public synchronized int atomicRead(byte[] x)  {
    try {
      read(x, 0, atom);
      }
    catch (Exception e) {
      e.printStackTrace();
      return -1;
      } 
    return atom;
    }
}

[SendProduction.java]

import java.io.*;

public class SendProduction extends ProduceData {
  OutputStream os;

  SendProduction(OutputStream os) {
  super(os);
  this.os = os;
  }

  public boolean dataProduction() {
    byte[] j = new byte[3];
    boolean done = false;
    j[0] = 0 ; j[1] = 1; j[2] = 2;  
  
    while(!done) {
      try {
        os.write(j, 0, 3);
        }
      catch (Exception e) { 
        e.printStackTrace();
        return true;
        }   
      }
    return done;    
    }
}

[ReceiveProduction.java]

import java.io.*;

public class ReceiveProduction extends ConsumeData {
  AtomicInputStream as;

  ReceiveProduction(AtomicInputStream as) {
    super(as);
    this.as = as;
    }

  public boolean dataConsumption() {
    byte [] i = new byte[3]; 
    try {
      for (;;) {
        as.read(i);
        System.out.println
          (Thread.currentThread().getName()+": " +
           i[0] + " " + i[1] + " " + i[2]);
        }
      }
    catch (Exception e) {
      e.printStackTrace();
      }
    return true;
    } 
}

[TestThread.java]

import java.io.*;

public class TestThread {
   public static void main(String a[]){
     try {
       PipedOutputStream os = new PipedOutputStream();
       AtomicInputStream as = new AtomicInputStream(3);
       os.connect(as);
       new SendProduction(os);
       new ReceiveProduction(as);       
       new ReceiveProduction(as);
       new ReceiveProduction(as);           
       }
     catch (Exception e) {
       e.printStackTrace();
       }
     }
}
That's OK for the situation One Producer and Many consumers.
To support many producers, simply create a AtomicOutputStream class with a synchronized atomicWrite method in it.

Using a PipedOutputStream is complicated, a simpler solution is to use Queue (especially with Java 5). A producer sends data to the Queue and the the consumers extract the data. See this HowTo.