Monday, November 07, 2016

Java threads Tutorial or Multithreading Tutorial in Java

Java threads Tutorial or Java 8 concurrency Tutorial or Java 8 Threads or Multithreading tutorial in Java


In this article we will discuss about java threads and also the new Java Concurrency features in Java 8..
Some of the topics covered here include are listed below. I have detailed examples for each case 
and also you can download the samples  for all this examples here.

Please click here to Download all the Examples  and import them as Java Projects.
Thread2DemoJava8Features.zip
and
Thread1DemoJava7Features.zip

You need Jdk 8.

Whats is Java thread ?

A thread is different path of execution within a process. A thread is light weight compared to Process.
A process has its own memory space (or to be precise own address space) and within a process you can have many threads and these threads 
all can access the memory of the this process. A process cannot access memory (I mean address space) of other space unless using something like IPC (inter process communication...) Threads share the process's resources, including memory and open files. This makes for efficient, but potentially problematic, communication

Java concurrency Topics or Key Concurrency Topics we will discuss: 

Here is the high level overview of the topics we will cover in this post. Its going to be long article, trust me once you read the entire article you appreciate all the features and how each can be used in different context to solve a particular problem. I will just give a brief overview then we will discuss each topic in depth with a working example.
  • Creating a thread using Thread class and Runnable Interface.
  • Creating a thread using Thread class and Runnable Interface using Java 8 Lambda Syntax.
  • Running multiple threads using "Executor service" and kick start them using
    eg: ExecutorService executor = Executors.newFixedThreadPool(2); 
  • ExecutorService.submit();
  • Callable and Future interfaces: Want your threads run method to return results ?  In case you want to return results from run method use Callable interface and  put your code in call() method (similar to run() but can return a result). Similar to Runnable interface's run method, the Callable interface has has call() which returns result wrapped in Future<?> interface
  • Using ExecutorService.invokeall() to submit multiple Threads that implement Callable<...> interface
  • Using "Executors.newWorkStealingPool()" - where in the number of threads may grow or Shrink.A work-stealing pool makes no guarantees about the order in which submitted tasks are executed and using "executor.invokeAny(callableTasks);" - Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.
  • Scheduling when to run a thread: Want to run a callable at a specified time  here are some options.
    • ScheduledFuture<Employee> schedFuture0 = executor.schedule(task0, 3, TimeUnit.SECONDS); //Execute after 3 seconds
    • ScheduledFuture<Employee> schedFuture1 = executor.schedule(task1, 10, TimeUnit.SECONDS); //Execute after 10 seconds
    • ScheduledFuture<Employee> schedFuture2 = executor.schedule(task2, 3, TimeUnit.SECONDS); //Execute after 3 seconds.
  • Want to run a thread with fixed delay ?
    • executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS); 
    • executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
  • Synchronized block vs Synchronized method.
  • Calling of wait(), notify(), notifyAll() in a synchronized context
    • wait() method tells the current thread (thread which is executing code inside a synchronized method or lock) to give up monitor and go to waiting state.
    • notify() method Wakes up a single thread that is waiting on this object's monitor.
    • notifyAll() method wakes up all the threads that called wait( ) on the same object.
  • Volatile why a variable should be marked volatile (To tell thread not to cache and instead re-read the value each time.)
  • CountDownLatch
    CountDownLatch is used to start a series of threads and then wait until all of them are complete (or until they call countDown() a given number of times CountDownLatch cannot be reused after meeting the final count.
    (Just opposite of Semaphore discussed further. In case you want to reuse countdownlatch try "CyclicBarrier")
  • CyclicBarrier:
    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
    CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
    The barrier is called cyclic because it can be re-used after the waiting threads are released.
    ( Cyclicbarrier is similar to countdownlatch except CyclicBarrier can be re used.)
  • Count down Latch VS CyclicBarriers:
    • CountDownLatch can not be reused after meeting the final count.
    • CountDownLatch can not be used to wait for Parallel Threads to finish.
    • CyclicBarrier can be reset thus reused.
    • CyclicBarrier can be used to wait for Parallel Threads to finish.
  • RentrantLock: 
    A re-entrant mutual exclusion lock with the same basic behavior and semantics as the implicit monitor lock accessed using
    a synchronized methods or statements, but with extended capabilities.
    Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.
  • RentrantLock with condition:
    The Condition interface factors out the java.lang.Object monitor methods (wait(), notify(), and notifyAll()) into distinct objects to give the effect of having multiple wait-sets per object,by combining them with the use of arbitrary Lock implementations.Where Lock replaces synchronized methods and statements, Condition replaces Object monitor methods.
    {eg:
          private Lock lock = new ReentrantLock();
          private Condition condition = lock.newCondition();
          condition.await() //Wait for the signal and release the lock kind like wait()
          condition.signal() //Send signal to for threads that are waiting for this condition kind like notify().
    }
  • ReadWriteLock: Many threads can have read lock at same time but only on thread will have write lock.
  • StampedLock: Java 8 ships with a new kind of lock called StampedLock which also support read and write locks just like ReadWrietLocak. In contrast to ReadWriteLock the locking methods of a StampedLock return a stamp represented by a long value.
  • SemaphoresSemaphore is used to control the number of concurrent threads that are using a resource.The Re-entrant locks usually grant exclusive access to variables or resources, a semaphore is capable of maintaining whole sets of permits.Its more like having tokens and once the token is used we need wait for the token to be released so that someone else can use it.
  • Dead LockWhat is a dead lock ? When two threads are waiting on each other for a resource to be available, while each one as acquired lock on other resource.
    e.g: Say we have two lists (list1 and list2).
    In one thread get lock on list1 and list2 in the order I specified. In second thread get lock on list2 and list1 .
    Both threads will never get locks on both objects due to different orders
  • Preventing dead lock in the above case by using "ReentrantLock" by declaring lock per resource and getting all locks at once.
  • Producer/Consumer example using synchronized keyword, wait() and notify() features.
  • Producer/Consumer example using ArrayBlockingQueue: Thread safe and can be accessed by multiple threads. ArrayBlockingQueue infact uses RentrantLock inside it if you see the implementation." If the Queue is full any write calls to Queue will be a blocking call.
    If queue is empty any reads will be blocking call.
  • Interrupting a thread: We can interrupt a thread by calling Thread.currentThread().interrupt().
    • If the current thread is in sleep an interruptedExcpetion will be thrown
    • If current thread is running a flag will be set we can check that using "Thread.currentThread().isInteruppted()"
    • If the thread is blocked on IO then the thread's interrupt status will be set, and the thread will receive a "java.nio.channels.ClosedByInterruptException"
  • AtomicIntegerThread safe. Internally, the atomic classes make heavy use of compare-and-swap (CAS), an atomic instruction directly supported by most modern CPU's. Those instructions usually are much faster than synchronizing via locks.So my advice is to prefer atomic classes over locks in case you just have to change a single mutable variable concurrently.
  • ConcurrentHashMap:Similar to HashTable its Synchronized. ConcurrentHashMap uses multiple buckets to store data. This avoids read locks and greatly improves performance over a HashTable. Both are thread safe, but there are obvious performance wins with ConcurrentHashMap.
    However when you read from a ConcurrentHashMap using get(), there are no locks,contrary to the HashTable for which all operations are simply synchronized. HashTable was released in old versions of Java whereas ConcurrentHashMap is a java 5+ thing.
Examples for each of the scenarios we described above are as follows.

Creating a thread using Thread class and Runnable Interface: 


Threads in Java can be created by extending Thread class or by implementing Runnable interface.
Here is an example of extending Thread class. However this will restrict your class from extending
any other classes  in which case you can implement Runnable interface.

Thread creation by extending Thread class.
package demo01ThreadClass;
/*****
 * Creating a thread using Thread class by overriding run() method.
 * @author twreddy
 ******/
class Runner extends Thread{
 
 
 public Runner(String threadName){
  setName(threadName);
 }
 
 @Override
 public void run() {
  
  for(int i=0; i < 10; i++){
   System.out.println("Hello "+ getName() +" "+ i);
   try{
    sleep(100);
   }catch(InterruptedException exp){
    exp.printStackTrace();
   }
  }
 }
}

public class App {

 public static void main(String[] args) {
 
  Runner r1 = new Runner("Thread1");
  Runner r2 = new Runner("Thread2");
  r1.start();
  r2.start();
  
 }
}
An example of how to create a threads using Runnable interface



/**
 * Creating a thread by implementing Runnable interface.
 * All code should be in run() method.
 * We just need to override run method.
 * @author twreddy
 *
 */
class Runner implements Runnable {

 @Override
 public void run() {
  for(int i=0; i < 10; i++){
   System.out.println("Hello "+Thread.currentThread().getName() +" "+ i);
   try{
    Thread.sleep(15);
   }catch(InterruptedException exp){
    exp.printStackTrace();
   }
  }
  
 }
}

public class App {
 
 public static void main(String[] args) {
  Runner r1 = new Runner();
  Runner r2 = new Runner();
  
  Thread t1 = new Thread(r1,"Test Thread 1");
  Thread t2 = new Thread(r2,"Test Thread 2");
  
  t1.start();
  t2.start();
 }
}
.

Creating a thread using  Runnable Interface in annoymous class



package demo03Anonymousclass;

/**
 * @author twreddy
 *  Using anonymous class to implement Runnable interface
 */
public class App {
 
 public static void main(String[] args) {
  //Annonymous clas....implementing Runnable interface...
  Thread t = new Thread(new Runnable(){
  @Override
  public void run() {
   for(int i=0; i < 10; i++){
    System.out.println("Hello "+Thread.currentThread().getName() +" "+ i);
     try{
      Thread.sleep(15);
     }catch(InterruptedException exp){
      exp.printStackTrace();
     }
       }
    
   }
  });
   
  //Start the thread
   t.start();
 }
}

Creating a thread using Runnable Interface in Java 8 Lambda Syntax and also java 7 way (no lambda).

In case you are not familiar java 8 lambda expresssion please read this article first
http://reddymails.blogspot.com/2016/09/lambda-expressions-java-8.html

package chap01runnable;

import java.util.concurrent.TimeUnit;
/**
 * @author twreddy
 *  A simple thread createding using Runnable interface and overriding run() method.
 */
public class TestRunnable {

 public static  int myCount = 0;
  
 public static void main(String[] args) {
  
  //Get the lambda version thread going first    
  Thread th = new Thread(getRunnable());
  th.setName("I am using Lamda version ");
  th.start();
  
  //java 7 
  MyRunnableThread  runnableThread  = new TestRunnable(). new MyRunnableThread();
  Thread th1 = new Thread(runnableThread);
  th1.start();
 }

 /*
  * Java 8 
  */
 public static Runnable getRunnable() {

  Runnable run = () -> {
   String threadName = Thread.currentThread().getName();
      try{
       for (int i=0; i < 10; i++){
        System.out.println(threadName);
        //Thread.sleep(100);
         TimeUnit.SECONDS.sleep(100);
       }
            } catch (InterruptedException iex) {
                System.out.println("Exception in thread: "+iex.getMessage());
            }
      
  };
  return run;
 }

 /*
  * Java 7 ways
  */
 class MyRunnableThread implements Runnable{
    
     public MyRunnableThread(){
          
     }
     public void run() {
         while(myCount <= 10){
             try{
                 System.out.println("Expl Thread: "+(++myCount));
                 //Thread.sleep(100);
                 TimeUnit.SECONDS.sleep(100);
             } catch (InterruptedException iex) {
                 System.out.println("Exception in thread: "+iex.getMessage());
             }
         }
     }
 }
 
} 



How can you run multiple threads at same time ? Or you want to submit "n" number of threads as batch ?.


Java "Executor service" has many flavors you can use to create a pool of threads and submit for execution.
Running multiple threads using "Executor service" and kick start them using ExecutorService.submit()
e. g : ExecutorService executor = Executors.newFixedThreadPool(2); 
Once a instance of Executor service is created we need to submit the threads using submit() method.
Once all submission is complete we need to invoke "executor.shutdown();"
This tells that , its ok to start the threads and no more new threads will be added to the pool.
You can wait for the service to complete or specify a timeout like this. This call blocks until all tasks have completed execution after a shutdown
request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

// If time not specified will wait till all tasks are completed
executor.awaitTermination(waitSeconds, TimeUnit.MINUTES);

However if the threads get interrupted then we may not shutdown the service in which case in a finally block you can force it to shut down  now using
executor.shutdownNow();

This will attempt to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
(Which I learnt from http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/ )  

Here is the java code where we are submitting multiple threads.

package chap02executors;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author twreddy
 *  Example to show how to run multiple threads using Executor service.
 */
public class TestExecutors {

  public static  int myCount = 0;
 
 public static void main(String[] args) {
  
  //This Runs more serially
  //ExecutorService executor = Executors.newSingleThreadExecutor();  
                //Instead use fixed pool so that both can run in paralled.
  ExecutorService executor = Executors.newFixedThreadPool(2); 
  executor.submit(getRunnable());
  
  MyRunnableThread runnbale = new TestExecutors().new MyRunnableThread();
  executor.submit(runnbale);
  //If you domt shut down it will not terminate
  System.out.println("================ Trying to Shut down ===========");
  //executor.shutdownNow(); // Will force all threads to stop
  //executor.shutdown(); //Will wait till all are complete 
  controlledShutDown(executor, 1);
  System.out.println("================ Shut down Complete ===========");
  
 }
  
 /*
  * Java 8 
  */
 public static Runnable getRunnable() {

  Runnable run = () -> {
   String threadName = Thread.currentThread().getName();
      try{
       for (int i=0; i < 20; i++){
        System.out.println("Lambda version = "+ threadName);
        //Thread.sleep(100);
         TimeUnit.MILLISECONDS.sleep(200);
       }
            } catch (InterruptedException iex) {
                System.out.println("Exception in thread: "+iex.getMessage());
            }
      
  };
  return run;
 }

 /*
  * Java 7 ways
  */
 class MyRunnableThread implements Runnable{
 
  public MyRunnableThread(){
          
     }
     public void run() {
         while(myCount <= 10){
             try{
                 System.out.println("Expl Thread: "+(++myCount));
                 //Thread.sleep(100);
                 TimeUnit.MILLISECONDS.sleep(200);
             } catch (InterruptedException iex) {
                 System.out.println("Exception in thread: "+iex.getMessage());
             }
         }
     }
 }
 
 /*
 Thanks to 
 http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/ 
 */
 
 public static void controlledShutDown(ExecutorService executor, int waitSeconds){
  
  try {
      System.out.println("===>attempt to shutdown executor");
      executor.shutdown();
      executor.awaitTermination(waitSeconds, TimeUnit.MINUTES);// if time not specified will wait till all taska are completed
  }
  catch (InterruptedException e) {
      System.err.println("************tasks interrupted");
  }
  finally {
      if (!executor.isTerminated()) {
          System.err.println("----------cancel non-finished tasks");
      }
      executor.shutdownNow();
      System.out.println("????????????????shutdown finished");
  }
  
 }
}


Callable and Future interfaces in Java threads:

Ever wondered how you could return results from the threads run method ?
- In case you want to return results from run method use Callable interface and  put your code in call() method (similar to run() but can return a result).
- Similar to Runnable interface's run method, the Callable interface has has call() which returns result wrapped in Future<?> interface

For our example lets use Employee DTO.
package chap03CallableFuture;

public class Employee {
  private String firstName;
  private String lastName;
  
  public Employee (String fn, String ln){
   this.firstName= fn;
   this.lastName = ln;
  }
  
  @Override
 public String toString() {
  return "fn="+firstName +"  ln="+ lastName;
 }
}
As you see in the below code though the call() method is returning  Employee
public Employee call() {... my run code magic happens here...}
Once you submit for executor service we will get the result wrapped as Future<Employee>
All you need to do is call get() method on the future object to get your Employee object.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/*****
 * @author twreddy
 *  
 *  In case you want to return results from run method use Callable interface and  put your code in call() method.
 *  Similar to Runnable interface's run method, the Callable interface has has call(). Callable is parameterized. 
 *  In this example we are returning Employee object.
 *  The result is returned by packing it in "Future" interface object 
 *  so you will get back Future<Employee>.... which has methods like Future.get() and future.isDone()...etc
 ****/
public class TestCallableFuture {

 public static void main(String[] args) throws Exception {

  ExecutorService executor = Executors.newFixedThreadPool(2); 
  Callable<Employee> task = getCallable();
  // Using Inner class to show Java 7 way of creating threads (non-lambda version)   
  Callable<Employee> task2 = new MyCallableThread();
  
  
  Future<Employee> futureLambda = executor.submit(task);
  Future<Employee> futureJava7 = executor.submit(task2);
  
  System.out.println("futureJava7.isDone()="+ futureJava7.isDone());
  //Calling the method get() blocks the current thread and waits until the callable completes  
  System.out.println(futureJava7.get());  
  
  //We can make this time bound but throws exception if the task is not complete within that time. 
  //System.out.println(futureJava7.get(2,TimeUnit.SECONDS));
  
  
  System.out.println("futureLambda.isDone()="+ futureLambda.isDone());
  //Calling the method get() blocks the current thread and waits until the callable completes 
  System.out.println(futureLambda.get());
  
  executor.shutdown();
  
 }

 /*
  * Java 8
  */
 public static Callable<Employee> getCallable() {

  Callable<Employee> task = () -> {
   try {
    System.out.println(" Lambda  version ....");
    TimeUnit.SECONDS.sleep(1);
    Employee emp = new Employee("Rama-lambda", "Chandra");
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  };

  return task;
 }

 /*
  * Java 7 ways
  */
 static  class MyCallableThread implements Callable<Employee> {

  public MyCallableThread() {

  }

  public Employee call() {
   try {
    System.out.println(" Regular java 7 version ....");
    TimeUnit.SECONDS.sleep(3);
    Employee emp = new Employee("Rama-Java7", "Chandra");
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  }

 }

}

Using ExecutorService.invokeall()

Using "ExecutorService.invokeall()" we can submit multiple Threads that implement Callable<...> interface.
Use this option when you are not sure of the number tasks to be executed in advance.
ExecutorService.invokeall(Collection of Callable) : Executes the given tasks, returning a list of Futures holding
their status and results when all complete.

Lets use the same Employee DTO:

package chap04CallableFutureBatchorMany;

public class Employee {
  private String firstName;
  private String lastName;
  
  public Employee (String fn, String ln){
   this.firstName= fn;
   this.lastName = ln;
  }
  
  @Override
 public String toString() {
  return "fn="+firstName +"  ln="+ lastName;
 }
}
As we see here invokeAll( ) can take as many tasks you want.

package chap04CallableFutureBatchorMany;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author twreddy
 *  Submitting Callable<...> as batch using executor.invokeAll()..... (Not executor.submit())
 */
public class TestCallableFuture {

 public static void main(String[] args) throws Exception {

  ExecutorService executor = Executors.newFixedThreadPool(3); 
  
  
  Callable<Employee> task0 = getCallable("task0");
  Callable<Employee> task1 = getCallable("task1");
  Callable<Employee> task2 = new MyCallableThread();
  
  List<Callable<Employee>> callableTasks = new ArrayList <Callable<Employee>>();
  callableTasks.add(task0);
  callableTasks.add(task1);
  callableTasks.add(task2);
  
  List<Future<Employee>> futuresResults  = executor.invokeAll(callableTasks);
  
  System.out.println("================== JAVA 7 ======================");
  //Java 7
  for(Future<Employee> empFuture:futuresResults){
   System.out.println("empFuture="+empFuture.get());
  }
    
  System.out.println("================== JAVA 8 ======================");
  printContentUsingLambda(futuresResults);
     
  
  executor.shutdown();
 }
 /*
  * Java 8
  */
 public static Callable<Employee> getCallable(final String name) {

  Callable<Employee> task = () -> {
   try {
    System.out.println(" Lambda  version ...."+ name);
    TimeUnit.SECONDS.sleep(1);
    Employee emp = new Employee("Rama-lambda", name );
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  };

  return task;
 }

 /*
  * Java 7 ways
  */
 static class MyCallableThread implements Callable<Employee> {

  public MyCallableThread() {

  }

  public Employee call() {
   try {
    System.out.println(" Regular java 7 version ....");
    TimeUnit.SECONDS.sleep(3);
    Employee emp = new Employee("Rama-Java7", "Chandra");
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  }

 }
 
 /* Print the Results */
 private static void printContentUsingLambda(List<Future<Employee>> futuresResults) {
  
  //java 8  way of printing results...
  futuresResults.stream().map(future -> {
         try {
             return future.get() +" Using Lambda." ;
         }
         catch (Exception e) {
             throw new IllegalStateException(e);
         }
     }).forEach(System.out::println);
  
 }

}

Using "Executors.newWorkStealingPool()"

Here the number of threads may grow or Shrink.A work-stealing pool makes no guarantees about the order in which submitted tasks are executed and using 
"executor.invokeAny(callableTasks);" 
 Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.

NOTE: USe the same Employee DTO as mentioned earlier. I will not paste the employee class code here.  

package chap05CallableFutureBatchInvokeAny;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author twreddy
 * 
 *  executor.invokeAny(callableTasks);  (NOT invokeall() or Submit()...);
 *  
 *  Instead of returning future objects this method blocks until the first callable  terminates and returns the result of that callable.
 *  Executes the given tasks, returning the result of one that has completed successfully 
 (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled. 
 The results of this method are undefined if the given collection is modified while this operation is in progress.
 * 
 */
public class TestCallableFuture {

 public static void main(String[] args) throws Exception {

  //ExecutorService executor = Executors.newFixedThreadPool(3);
  /* 
   * The actual number of threads may
   * grow and shrink dynamically. A work-stealing pool makes no
   * guarantees about the order in which submitted tasks are
   * executed.
   */
  ExecutorService executor = Executors.newWorkStealingPool();
  
  
  Callable<Employee> task0 = getCallable("task0",8);
  Callable<Employee> task1 = getCallable("task1",4);
  Callable<Employee> task2 = new MyCallableThread();
  
  List<Callable<Employee>> callableTasks = new ArrayList <Callable<Employee>>();
  
  callableTasks.add(task0);
  callableTasks.add(task1);
  callableTasks.add(task2);
  
  //Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.
  /* 
   * Executes the given tasks, returning the result of one that has completed successfully 
   * (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled. 
   * The results of this method are undefined if the given collection is modified while this operation is in progress.
   */
  Employee employee  = executor.invokeAny(callableTasks);
  
  //Whoever completes first rest will not come back
  System.out.println(" Just one Employee = "+ employee);
  
  executor.shutdown();
 }

 
 /*
  * Java 8
  */
 public static Callable<Employee> getCallable(final String name, int seconds) {

  Callable<Employee> task = () -> {
   try {
    System.out.println(" Lambda  version ...."+ name);
    TimeUnit.SECONDS.sleep(seconds);
    Employee emp = new Employee("Rama-lambda", name );
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  };

  return task;
 }

 /*
  * Java 7 way os using class to implement an interface
  */
 static class MyCallableThread implements Callable<Employee> {
  public MyCallableThread() {
  }

  public Employee call() {
   try {
    System.out.println(" Regular java 7 version ....");
    TimeUnit.SECONDS.sleep(4);
    Employee emp = new Employee("Rama-Java7", "Chandra");
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  }
 }
}

Scheduling when to run a thread or Scheduling a task.

Want to run a callable at a specified time  here are some options...
Like Future<?> here we will get ScheduleFuture<?> as return objects.
  1. ScheduledFuture<Employee> schedFuture0 = executor.schedule(task0, 3, TimeUnit.SECONDS); //Execute after 3 seconds
  2. ScheduledFuture<Employee> schedFuture1 = executor.schedule(task1, 10, TimeUnit.SECONDS); //Execute after 10 seconds
  3. ScheduledFuture<Employee> schedFuture2 = executor.schedule(task2, 3, TimeUnit.SECONDS); //Esecute after 3 seconds.
Lets test with an example.

package chap06SechduleExecutor;
public class Employee {
  private String firstName;
  private String lastName;
  
  public Employee (String fn, String ln){
   this.firstName= fn;
   this.lastName = ln;
  }
  
  @Override
 public String toString() {
  return "fn="+firstName +"  ln="+ lastName;
 }
}

Note that when tasks are schedule to run in Future you will get return types wrapped as 
ScheduledFuture<?> 

package chap06SechduleExecutor;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author twreddy
 *  Want to run a callable at a specified time  here are some options...
 *  Yes you will get ScheduledFuture 
 */
public class TestCallableFuture {

 public static void main(String[] args) throws Exception {

  /*
  A ScheduledExecutorService is capable of scheduling tasks to run either periodically 
  or once after a certain amount of time has elapsed.
  */
  ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  
  
  Callable<Employee> task0 = getCallable("task0");
  Callable<Employee> task1 = getCallable("task1");
  Callable<Employee> task2 = new TestCallableFuture().new MyCallableThread();
  
  // Run once after a certain amount of time has elapsed.
  ScheduledFuture<Employee> schedFuture0 = executor.schedule(task0, 3, TimeUnit.SECONDS);                    //Execute after 3 seconds
  ScheduledFuture<Employee> schedFuture1 = executor.schedule(task1, 10, TimeUnit.SECONDS); //Execute after 10 seconds
  ScheduledFuture<Employee> schedFuture2 = executor.schedule(task2, 3, TimeUnit.SECONDS); //Esecute after 3 seconds.
  
  
  
  long remainingDelay0 = schedFuture0.getDelay(TimeUnit.MILLISECONDS);
  System.out.printf("Remaining Delay task0: %sms \n", remainingDelay0);
  
  long remainingDelay1 = schedFuture1.getDelay(TimeUnit.MILLISECONDS);
  System.out.printf("Remaining Delay task1: %sms \n", remainingDelay1);
  
  
  long remainingDelay2 = schedFuture2.getDelay(TimeUnit.MILLISECONDS);
  System.out.printf("Remaining Delay task2: %sms \n", remainingDelay2);
  
  System.out.println("schedFuture0="+schedFuture0.get());
  System.out.println("schedFuture1="+schedFuture1.get());
  System.out.println("schedFuture2="+ schedFuture2.get());
  
  executor.shutdown();
 }

 /**
  * Java 8
  **/
 public static Callable<Employee> getCallable(final String name) {

  Callable<Employee> task = () -> {
   try {
    System.out.println(" Lambda  version ...."+ name);
    TimeUnit.SECONDS.sleep(5);
    Employee emp = new Employee("Rama-lambda", name );
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  };

  return task;
 }

 /*
  * Java 7 way os using class to implement an interface
  */
 class MyCallableThread implements Callable<Employee> {

  public MyCallableThread() {

  }

  public Employee call() {
   try {
    System.out.println(" Regular java 7 version ....");
    TimeUnit.SECONDS.sleep(5);
    Employee emp = new Employee("Rama-Java7", "Chandra");
    return emp;
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  }

 }
} 


Want to run a thread with fixed delay ?

Here are some options to execute a thread with fixed delay.
  1. executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS); 
  2. executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
package chap07ScheduleAtFixedRateOrFixedDelay;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author twreddy
 *  Want to run same task again and again.... and also add some timing to it.
 *  
 *   executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS);
 *   //Will run for every  5  second delay. And also considers time taken by task 
  //So next task will start after this task time + specified delay . In this case 5 seconds delay + 1 second for task.
  //So task will run at every six seconds
  executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
  //Will run for ever  5  second delay. though the underlying task finished  in 1 second.
  //Initial delay is considered just first time only.
  //So every 5 second a task is fired here 
 *  
 ***/
public class TestScheduleFixedRateOrFixedDelay {

 public static void main(String[] args) {
  //scheduleAtFixedRate();
  scheduleWithFixedDelay();
 }
 
 public static void scheduleWithFixedDelay(){
  
  ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  Runnable task = getRunnable("task1");
  int initialDelay = 2;
  int period = 5;
  //Will run for every  5  second delay. And also considers time taken by task 
  //So next task will start after this task time + specified delay . In this case 5 seconds delay + 1 second for task.
  //So task will run at every six seconds
  executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS); 
 }

 public static void scheduleAtFixedRate(){
  
  ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
  Runnable task = getRunnable("task0");
  int initialDelay = 2;
  int period = 5;
  //Will run for ever  5  second delay. though the underlying task finished  in 1 second.
  //Initial delay is considered just first time only.
  //So every 5 second a task is fired here 
  executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); 
 }
 
 /*
  * Java 8
  */
 public static Runnable getRunnable(final String name) {

  Runnable task = () -> {
   try {
    System.out.println(" Lambda  version ...."+ name +" Date ="+ new Date());
    TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
    throw new IllegalStateException("task interrupted", e);
   }
  };

  return task;
 }
 
}


Synchronized block vs Synchronized method.

Use Synchronized block when you want some part of the code to be executed by one thread.In case you want entire method to be thread safe you can mark the method as "Synchronized".So whichever thread has lock on the object will execute that block or method.
A class may have synchronized method and non-synchronized methods in which case non-synch methods
can still be called while a thread is executing "synchronized" method

Example 1:
-------------------
Using synchronized method: Try running this code by removing synchronized Keyword and
each time you get different results and not "40,000" as expected.

package demo04synchronized;

/**
 * @author TWreddy
 *  When two thread access same Object thats is 
 *  invoking method(s) on this object and if the method(s) are changing 
 *  instance variables of that class then the result is unpredictable. 
 *  
 *  To avoid this we need to Synchronize that common code so that only one thread 
 *  can run it at any time.
 *  
 *  Here is an example where entire method is synchronized.
 *   
 *  With synchronized method we always have expected result.
 *  This gets lock on object and will allow one thread at a time.
 *  
 *  
 ****/
public class App {
   private int counter;
 public static void main(String[] args) {
  App app = new App();
  app.doWork();
 }
 
 //With out synchronized we will get different values each time.
 //Or we could have just used AtomicInteger without Synchronized keyword...
 public synchronized void increment(){
  counter++;
 }
 
 public void doWork(){
  
  // Thread one calls increment
   Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
     for(int i=0; i < 20000; i++){
      //Accessing directly both threads can get random values 
      //counter++;  
      increment();
     }
     
    }
   });
   
   
   //Thread two calls increment
   Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
     for(int i=0; i < 20000; i++){
      //Accessing directly both threads can get random values
      //counter++;  
      increment();
     }
    }
   });
   
   
   //Kick start both threads.
   t1.start();
   t2.start();
   
   
   System.out.println("Ok waiting ");
   try {
    //Wait untill this thread dies...
    t1.join();
    t2.join();
    
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   //With out synchronized we will get different values each time.
   System.out.println(counter);
 }
}
Example using synchronized block.
---------------------------------------------


package demo05SynchronizedBlock;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author TWreddy
 *  When two thread access same Object thats is 
 *  invoking method(s) on this object and if the method(s) are changing 
 *  instance variables of that class then the result is unpredicatable. 
 *  
 *  To avoid this we need to Synchronize that common code so that only one thread 
 *  can run it at any time.
 *  
 *  Here is code which shows how to use Synchronized Block. (Instead of synchronizing entire method)
 *  
 */
public class Worker {
 
 private List<Integer> list1 = new ArrayList<Integer>();
 private List<Integer> list2 = new ArrayList<Integer>();
 private Random random = new Random();
 
 private  void stageOne(){
  try{
   Thread.currentThread().sleep(1);
  }catch(InterruptedException exp){
   
  }
  //Or we could have said synchronized(list1){}
  list1.add(random.nextInt(100));
  
 }
 
 private  void stageTwo(){
  try{
   Thread.currentThread().sleep(1);
  }catch(InterruptedException exp){
   
  }
  //Or we could have said synchronized(list2){}
  list2.add(random.nextInt(100));
  
 }
  private void process(){
  for(int i=0; i < 1000; i++){
   //You may have other steps here out side synchronize block 
   //Which you don't care even if multiple threads run at same time.
   //Synchronize current Object 
    synchronized (this) {
    //So any thread will run this methods as one unit.
     stageOne();
     stageTwo();
   }
   
  }
  }
  
 public void main(){
  System.out.println("...Starting ......");
  
  long start = System.currentTimeMillis();
  //process();
  Thread t1 = new Thread(new Runnable() {
   @Override
   public void run() {
    process();
   }
  });
  
  
  Thread t2 = new Thread(new Runnable() {
   @Override
   public void run() {
    process();
   }
  });
  
  t1.start();
  t2.start();
  
  try {
   // Waits for this thread to die.
   t1.join();
   System.out.println(" t1.join() returned."+ new Date());
   // Waits for this thread to die.
   t2.join();
   System.out.println(" t2.join() returned."+ new Date());
  } catch (Exception e) {
   // TODO: handle exception
  }
  
  
  long end = System.currentTimeMillis();
  System.out.println("Total time="+ (end-start));
  System.out.println("L1="+ list1.size() +" L2="+ list2.size());
  
 }
}

public class App {
 
 public static void main(String[] args) {
  Worker worker = new Worker();
  worker.main();
 }
}

Calling of wait(), notify(), notifyAll() in a synchronized context

  • wait() method tells the current thread (thread which is executing code inside a synchronized method or lock) to give up monitor and go to waiting state.
  • notify() method Wakes up a single thread that is waiting on this object's monitor.
  • notifyAll() method wakes up all the threads that called wait( ) on the same object.
In this example we have 
 1. Producer- So when the list is empty or is not  full the producer will add data to list and will call notify(). Notify() Will wakeup consumer if its in wait() state.
 2. Producer - When list is full Producer will call wait() on the list and waits until it gets the lock on the object (the list) 
 3. Consumer - When List is not empty , it reads data and will call notify() on the list. This will wakeup Producer thread if its in waiting state.
 4. Consumer - When List is empty consumer can go to wait() state and will wakeup if Producer call notify().
 When thread calls wait it releases the current object lock (it keeps all locks from other objects) and than goes to WAITING state
 On that object to be available... A notify can inform the availability... and will continue rest of the code..
 provide it wins the war against other waiting folks else will still wait

Lets test with an example.
Here is simple class  that produces random integers and stores in a list. When the list is filled (here we have set the limit to 5)
It waits for the size to  go down  and will not add any more data. When the list is empty the consumer goes to wait()

package demo09producerConsumerWaitNotify;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * @author twreddy
 */
public class Processor{
 
 private List<Integer> numList = new ArrayList<Integer>();
 private static int MAX_SIZE = 5;
 
 public void produce() throws InterruptedException{
  Random random = new Random();
   synchronized (numList) {
    if(numList.size() < MAX_SIZE){
     numList.add(random.nextInt(100));
     System.out.println("PRODUCER-List Size ="+ numList.size());
     //if we had said synchronized(this) then we can call notify() on "this" object directly.
     numList.notify();  
    }else {
     System.out.println("PRODUCER-Waiting as Queue is full"+ numList.size());
     //When thread calls wait it releases the current object lock 
     //(it keeps all locks from other objects) and than goes to WAITING state
     numList.wait();
    }
   }
   Thread.sleep(1000);
 }
 
 public void consume() throws InterruptedException{
  
   synchronized (numList) {
    if(numList.size() == 0){
     System.out.println("CONSUMER Waiting as there No data++++");
     //Which means Release the lock and wait to be informed to try to acquire lock 
     numList.wait();
    }else {
     Integer num = numList.remove(0);
     System.out.println("CONSUMER Read "+ num +" size="+ numList.size());
     numList.notify();
    }
   }
   
   
   Thread.sleep(5000);
   
  }
 
}
Now lets start the producer and consumer as threads.
package demo09producerConsumerWaitNotify;

/**
 * @author twreddy
 *
 *    A simple example of Producer consumer simulated using wait() and notify() methods.
 */
public class App {
 
 public static void main(String[] args)throws Exception {
  final Processor proc = new Processor();
  
  Thread t1 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    while(true ){
     try {
      proc.produce();
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
    
   }
  });
  

  Thread t2 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    while(true ){
     try {
      proc.consume();
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
    
   }
  });
  
  t1.start();
  t2.start();
 }
}

The Volatile keyword in Java Threads

Why a variable should be marked volatile ? (To tell thread not to cache and instead re-read the value each time.)
Here is an example as when to use Volatile.
Without the volatile keyword on some platforms of Java the variable "private volatile boolean running" (see code below) may be cached as such the loop will never end. By saying Volatile we are telling the thread that the value can change so each time it verifies to make sure there is no change in value.
So this way the code is guaranteed to work on all implementations of java.

package demo04VolatileKeyword;

import java.util.Scanner;

/* 
 * Without the volatile keyword on some platforms of Java
 *  the variable "running" may be cached as such the loop will never end.
 *  By saying Volatile we are telling the thread that the value can change so 
 *  each time it verifies to make sure there is no change in value.
 *  
 *  So this way the code is guaranteed to work on all implementations of java.
 */
class Processor extends Thread{
 
 //To tell thread not cache and refresh each time it reads the value
 //From this variable. 
 private volatile boolean running = true;
 public void run(){
  
  while (running){
   System.out.println("Type some text and press Enter key to Stop.");
   
   try{
    sleep(100);
   }
   catch(Exception exp){
    exp.printStackTrace();
   }
  }
  
 }
 
 public void shutDown(){
  running = false;
 }
}

public class App4 {

 public static void main(String[] args) {
  Processor proc = new Processor();
  proc.start();
  Scanner scan = new Scanner(System.in);
  scan.nextLine();
  proc.shutDown();
  
 }
 
}

CountDownLatch

CountDownLatch is used to start a series of threads and then wait until all of them are complete (or until they call countDown() a given number of times so it decrements to zero)
CountDownLatch can not be reused after meeting the final count.
Eg: "CountDownLatch latch = new CountDownLatch(3);" and each thread when it finishes can reduce the count by calling "latch.countDown();" and  at some point when the count is zero the call "latch.await()" will return (or if its Interrupted) (usually main thread will call latch.await() ).
So you know all threads finished or something went wrong at this point the main thread will continue beyond "latch.await();"
So in this case after three threads call latch.countDown() the latch.await() will return. (Just opposite of Semaphore discussed further. In case you want to reuse countdownlatch try "CyclicBarrier")

CountDownLatch works in latch principle, main thread will wait until gate is open. One thread waits for n number of threads specified while creating CountDownLatch in Java.Any thread, usually main thread of application, which calls CountDownLatch.await() will wait until count reaches zero or its interrupted by another thread. All other threads are required to do count down by calling CountDownLatch.countDown() once they are completed or ready.As soon as count reaches zero, Thread awaiting starts running. One of the disadvantages/advantages of CountDownLatch is that its not reusable once count reaches to zero you can not use CountDownLatch any more.Opposite of Semaphore which is more of granting tokens (Will describe later ).

Example:
Though in this example we start six threads after 3 threads the CountDownLatch reaches zero and call returns from await(). See code.

package demo07countdownLatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author twreddy
 *  CountDownLatch  Example.
 *  
 *****/
class Processor implements Runnable{

 private CountDownLatch latch;
 private int threadId;
 
 public Processor(CountDownLatch latch, int threadId){
  this.latch = latch;
  this.threadId = threadId;
 }
 
 public void setLatch(CountDownLatch latch){
  this.latch = latch;
 }
 
 @Override
 public void run() {
  // TODO Auto-generated method stub
  System.out.println(" starting" + Thread.currentThread().getName() +" ThreadId="+ threadId);
  try {
   Thread.sleep(2000);
  } catch (Exception e) {
   // TODO: handle exception
  }
  
  System.out.println(" ending" + Thread.currentThread().getName() +" ThreadId="+ threadId);
  
  latch.countDown();
 }
 
 
}
public class App {
 
 public static void main(String[] args) {
 
  ExecutorService executor = Executors.newFixedThreadPool(2);
  CountDownLatch latch = new CountDownLatch(3);
  for(int i=0; i < 6; i ++){
   executor.submit(new Processor(latch, i));
  }
  
  try {
   /****
    *  Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted. 
    If the current count is zero then this method returns immediately. 
    ****/
   latch.await();
   //So after 3 threads (countdown=0) this will print but  executor will continue running the remaining threads.
   System.out.println(" === I am printed once the count is reached zero.  =====");
  } catch (Exception e) {
   e.printStackTrace();
  }
  
  executor.shutdown();
 }
 
}

CyclicBarrier:

 A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.( Cyclicbarrier is similar to countdownlatch except CyclicBarrier can be re used.)

Countdown Latch VS CyclicBarriers:
CountDownLatch can not be reused after meeting the final count.CountDownLatch can not be used to wait for Parallel Threads to finish.
CyclicBarrier can be reset thus reused CyclicBarrier can be used to wait for Parallel Threads to finish.Like in Countdown latch when creating cyclic barrier we need specify the count.

CyclicBarrier barrier = new CyclicBarrier(3);

When thread calls barrier.await(); the count goes down by 1 and thread will wait until.
When all threads (in this case 3 threads) call await() the execution of thread will continue.
In case you need to do specific action when all threads have called await() you can use final 

CyclicBarrier barrier = new CyclicBarrier(3, new Runnable(){
           @Override
            public void run(){
                //This run() method  will be fired when all 3 threads reaches the barrier
                System.out.println("All Threads have called Await() . Good to go.");
            }
        });

Here is full code in Action.:

package demo07CyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

    //A simple Runnable task for each thread
     static class Task implements Runnable {

        private CyclicBarrier barrier;

        public Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is waiting on barrier by calling barrier.await() ");
                //Waits until all threads have invoked await() on this barrier.
                barrier.await();
               
                //In case you want to specify time limit.
                //barrier.await(10, TimeUnit.SECONDS);
                
                System.out.println(Thread.currentThread().getName() + " Moved beyond barrier.await() ");
            } catch (InterruptedException|BrokenBarrierException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String args[]) {

        //creating CyclicBarrier with 3 parties i.e. 3 Threads needs to call await()
     //final CyclicBarrier cb = new CyclicBarrier(3);
     
     //In case you want some action after Barrier count is zero you can 
     // add a runnable task too as shown here.
     final CyclicBarrier cb = new CyclicBarrier(3, new Runnable(){
            @Override
            public void run(){
                //This task will be executed once all thread reaches barrier
                System.out.println("All Thread finished calling  barrier.await();. So if there is any generic stuff you want to do. Add code in this run method. ");
            }
        });
  
     
        //starting each of thread
        Thread t1 = new Thread(new Task(cb), "CB Test Thread 1");
        Thread t2 = new Thread(new Task(cb), "CB Test Thread 2");
        Thread t3 = new Thread(new Task(cb), "CB Test Thread 3");

        t1.start();
        t2.start();
        t3.start();
      
    }
}

RentrantLock

Its more like synchronized with some additional capabilities

A Re-entrant Lock mutual exclusion lock with the same basic behavior and semantics as the implicit monitor lock accessed using
a synchronized methods or statements, but with extended capabilities. Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.

  {

  eg:
  Lock rentLock = new RentrantLock(); 
  rentLock.lock();
 ... Access that critical resource
 ....
  rentLock.unlock()
  }


Here is an example
This class will start two threads and both threads trying to invoke methods on same object and thus access common resource.
We will try to synchronize the access using RentrantLock.

package demo10RentrantLocks;

/**
 * @author twreddy
 *  Shows an example of how to use Rentrant Lock with basic lock() and unlock() methods.
 */
public class App {
 
 public static void main(String[] args) throws Exception{
  
  final Runner r = new Runner();
  Thread t1 = new Thread(new Runnable() {
   @Override
   public void run() {
    r.firstThread();
   }
  });
  
  Thread t2 = new Thread(new Runnable() {
   @Override
   public void run() {
    r.secondThread();
   }
  });
  
  
  t1.start();
  t2.start();
  
  t1.join();
  t2.join();
  
  //Just printing a message so that we know the threads are done and 
  //we are here.
  r.finished();
 }
}
And here is  the class that uses the RentrantLock Synchronize.

package demo10RentrantLocks;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author TWreddy
 *  This is very basic example just acts like automic for increment
 *  who ever has the lock will run increment method.
 *  Explicit locking and gives more fine grained control compared to synchronized keyword.
 */
public class Runner {

 private int counter = 0;
 private Lock lock = new ReentrantLock();
 
 public void increment(){
  for(int i=0; i < 20000; i++){
   counter++;
  }
 }
 
 public void firstThread(){
  //Better than using synchronized ?? 
  //yes it gives more fine grained control. But more like synchronized.
  lock.lock();
  try{
   increment();
  }
  finally{
  //always do this in finally so that the lock is released at all costs.
   lock.unlock();
  }
 }

 public void secondThread(){
  lock.lock();
  try {
   increment();
  }
  finally{
   lock.unlock();
  }
 }
 
 public void finished(){
  System.out.println(" Count is ="+ counter);
 }
}

RentrantLock with condition

The Condition interface factors out the java.lang.Object monitor methods (wait(), notify(), and notifyAll())  into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations.

Where Lock replaces synchronized methods and statements, Condition replaces Object monitor methods.

  { 
e.g:
 private Lock lock = new ReentrantLock();
 private Condition condition = lock.newCondition();
 condition.await() //Wait for the signal  and release the lock, kind like wait()
 condition.signal() //Send signal to for threads that are waiting for this condition kind like notify().

  }


Condition declares the following methods:
- void await() forces the current thread to wait until it's signaled or interrupted.
- boolean await(long time, TimeUnit unit) forces the current thread to wait until it's signaled or interrupted, or the specified waiting time elapses.
- long awaitNanos(long nanosTimeout) forces the current thread to wait until it's signaled or interrupted, or the specified waiting time elapses.
- void awaitUninterruptibly() forces the current thread to wait until it's signaled.
- boolean awaitUntil(Date deadline) forces the current thread to wait until it's signaled or interrupted, or the specified deadline elapses.
- void signal() wakes up one waiting thread.
- void signalAll() wakes up all waiting threads.

Code example using Condition with Re-entrant lock.


package demo10RentrantLocksWithCondition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author twreddy
 * Shows an example of how to use Rentrant Lock with  lock() and unlock() methods 
 * and also making use of 
 *  private Lock lock = new ReentrantLock();
 private Condition condition = lock.newCondition();
 
 condition.await() //Wait for the signal  and release the lock
 condition.signal() //Send signal to for threads that are waiting for this condition.
 */
 
public class App1 {
 
 public static void main(String[] args) throws Exception{
  final Runner1 r = new Runner1();
  Thread t1 = new Thread(new Runnable() {
   @Override
   public void run() {
    r.firstThread();
   }
  });
  
  Thread t2 = new Thread(new Runnable() {
   @Override
   public void run() {
    try{
     r.secondThread();
    }catch(Exception exp){
     exp.printStackTrace();
    }
   }
  });
  
  t1.start();
  t2.start();
  //Wait for threads to finish.
  t1.join();
  t2.join();
  
  r.finished();
 }
}
Here is the code using Condition with await() and signal()
package demo10RentrantLocksWithCondition;

import java.util.Scanner;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author TWreddy
 *  LOCK with (lock() and unlock() along with condition object)
 *  Uses Rentrantlock Object.
 *  
 *  
 *  condition.await() will release lock though you have obtained lock() earlier.
 *  condition.signal() tells the awaiting thread on that condition that it can 
 *  now proceed provided it gets handle on lock (its done behind scenes)
 */
public class Runner1 {

 private int counter = 0;
 private ReentrantLock lock = new ReentrantLock();
 private Condition condition = lock.newCondition();
 
 public void increment(String whoCalled){
  for(int i=0; i < 1000; i++){
   counter++;
  }
  System.out.println(" Increment was excuted by ="+ whoCalled);
 }
 
 public void firstThread(){
  //Better than using synchronized as it give more fine grained control.
  // get lock...
  lock.lock(); 
  
  try{
   System.out.println(" Sleeping in First, Count="+  lock.getHoldCount());
   
   /***
    * Causes the current thread to wait until it is signaled or thread is interrupted.
    * The lock associated with this {@code Condition} is atomically
    * released and the current thread becomes disabled for thread scheduling.
    * 
    * so the previous lock we got will be released and will be waiting here
    * In all cases, before this method can return the current thread must
    * re-acquire the lock associated with this condition. When the
    * thread returns it is guaranteed to hold this lock.
    ***/
   //Lock will be released and will wait until signaled and will try to get lock again.
   condition.await();
   
   System.out.println(" Woke up in First. Lock Count="+ lock.getHoldCount());
   increment("firstThread");
  }
  catch(Exception exp){
   exp.printStackTrace();
  }
  finally{
   //always do this in finally so that the lock is released at all costs.
   lock.unlock();
   
  }
 }
 
 public void secondThread() throws Exception{
  
  Thread.sleep(2000);
  System.out.println(" Lock count before in Second thread="+ lock.getHoldCount());
  lock.lock();
  System.out.println(" Lock count after lock()  in Second thread="+ lock.getHoldCount());
  
  readSomeInput();
  
  /* If any threads are waiting on this condition then one
      * is selected for waking up. That thread must then re-acquire the
      * lock before returning from  await. 
      * So the lock anyway will be available unless we run unlock() below here.
      */
  condition.signal();
  //If you forget to call Signal() the other thread will wait() for ever...
  System.out.println(" Lock count After signal() in Second thread="+ lock.getHoldCount());
  
  try {
   increment("secondThread");
  }
  finally{
   lock.unlock();
   
  }
 }


 private void readSomeInput() {
  System.out.println(" Enter :?");
  Scanner scan = new Scanner(System.in);
  String str  = scan.nextLine();
  scan.close();
  System.out.println(" Got it... str="+ str);
 }
 
 public void finished(){
  System.out.println(" Count is ="+ counter);
 }
}

ReadWriteLock

ReadWrtie lock - Many threads can have read lock  at same time but only on thread will have write lock.
Eg: ReadWriteLock lock = new ReentrantReadWriteLock();
Only one thread can have write lock and several can get read locks.
lock.writeLock().lock();
lock.writeLock().unlock();
lock.readLock().lock();
lock.readLock().unlock();


package chap10ReadWriteLock;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author twreddy
 * ReadWrtie lock - Many threads can have read lock  at same time but only on thread will have write lock.
 ***/
public class TestLock {
 
 Map<String, String> map = new HashMap<>();
 ReadWriteLock lock = new ReentrantReadWriteLock();
 int count = 0;
 public static void main(String[] args) {
  
  ExecutorService executor = Executors.newFixedThreadPool(3);
  

  TestLock testreadWrite = new TestLock();
  Runnable writeTask = testreadWrite.getWriteTask();
  Runnable readTask = testreadWrite.getReadTask();
  

  executor.submit(writeTask);
  
  //BOTH read tasks will wait since the previous write task will hold the LOCK.
  //then both read threads will fire immediately
  //Same task can be submitted it will stick with different thread
  executor.submit(readTask);
  executor.submit(readTask);
  
  stop(executor);
 }
 
 
 
 public Runnable getWriteTask(){
  
  Runnable writeTask = () -> {
   
      lock.writeLock().lock();
      try {
       
           map.put("foo", "bar"+ count);
           System.out.println("========> Writing to map by wrtie task"+ Thread.currentThread().getName() +" date="+ new Date() );
           TimeUnit.SECONDS.sleep(3);
           count++;
      } 
      catch(Exception exp){
       exp.printStackTrace();
      }
      finally {
          lock.writeLock().unlock();
      }
  };
  
  return writeTask;
 }
 
 
 public Runnable getReadTask(){
  
  Runnable readTask = () -> {
      lock.readLock().lock();
      try {
       System.out.println("==> Reading by  task="+ Thread.currentThread().getName() +" date="+ new Date());
          System.out.println(map.get("foo"));
          TimeUnit.SECONDS.sleep(3);
          System.out.println("==> Reading done by task="+ Thread.currentThread().getName() );
      }catch(Exception exp){
       exp.printStackTrace();
      }
      finally {
          lock.readLock().unlock();
      }
  };
  
  return readTask;
 }
 
 
 
 
 public static void stop(ExecutorService executor) {
        try {
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("termination interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("killing non-finished tasks");
            }
            executor.shutdownNow();
        }
    }
}


StampedLock

Java 8 ships with a new kind of lock called StampedLock which also support read and write locks just like in the example above. In contrast to ReadWriteLock the locking methods
of a StampedLock return a stamp represented by a long value. You can use these stamps to either release a lock or to check if the lock is still valid. Additionally stamped locks support another lock mode called optimistic locking.
Keep in mind that stamped locks don't implement reentrant characteristics. Each call to lock returns a new stamp and blocks if no lock is available even if the same thread already holds a lock.
So you have to pay particular attention not to run into deadlocks.

{
 StampedLock lock = new StampedLock();
 long stamp = lock.writeLock();
 ....
 lock.unlockWrite(stamp);
 //For Read lock
 long stamp =lock.readLock();
 ....
 ....
 lock.unlock(stamp); 
 }
Here is the sample code to test Stamped Locks.


package chap11StampedLock;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;

/**
 * @author twreddy
 * Keep in mind that stamped locks don't implement reentrant characteristics. Each call to lock returns a 
 * new stamp and blocks if no lock is available even
 * if the same thread already holds a lock. So you have to pay particular attention not to run into deadlocks.
 **/
public class TestLock {
 
 Map<String, String> map = new HashMap<>();
 StampedLock lock = new StampedLock();
 int count = 0;
 public static void main(String[] args) {
  
  ExecutorService executor = Executors.newFixedThreadPool(3);
  TestLock testreadWrite = new TestLock();
  Runnable writeTask = testreadWrite.getWriteTask();
  Runnable readTask = testreadWrite.getReadTask();
  executor.submit(writeTask);
  
  //BOTH read tasks will wait since the previous write task will hold the LOCK.
  //then both read threads will fire immediately
  //Same task can be submitted it will stick with different thread
  executor.submit(readTask);
  executor.submit(readTask);
  
  stop(executor);
 }
 
 public Runnable getWriteTask(){
  
  Runnable writeTask = () -> {
   
      long stamp = lock.writeLock(); // Blocking call.
      try {
       
           map.put("foo", "bar"+ count);
           System.out.println("========> Writing to map by wrtie task"+ Thread.currentThread().getName() +" date="+ new Date() );
          
           TimeUnit.SECONDS.sleep(3);
           count++;
      } 
      catch(Exception exp){
       exp.printStackTrace();
      }
      finally {
       System.out.println("WrtieLock = "+stamp);
          lock.unlockWrite(stamp);
      }
  };
  
  return writeTask;
 }
 
 public Runnable getReadTask(){
  
  Runnable readTask = () -> {
   long stamp =lock.readLock(); //Blocking call will wait till it gets lock.
   //lock.tryOptimisticRead() --> returns quickly and the value could be zero or some number.
   
   /*
      An optimistic read lock is acquired by calling tryOptimisticRead() which always returns a stamp without 
     blocking the current thread, no matter if the lock is actually available. If there's already a write lock active 
     the returned stamp equals zero.You can always check if a stamp is valid by calling lock.validate(stamp).
    
    So when working with optimistic locks you have to validate the lock every time after accessing any shared 
    mutable variable to make sure the read was still valid.
    lock.tryConvertToWriteLock(); --- not a blocking call.
    Sometimes it's useful to convert a read lock into a write lock without unlocking and locking again. 
    StampedLock provides the method tryConvertToWriteLock()
    */
   
      try {
       System.out.println("==> Reading by  task="+ Thread.currentThread().getName() +" date="+ new Date());
          System.out.println(map.get("foo"));
          TimeUnit.SECONDS.sleep(3);
          System.out.println("Optimistic Lock Valid : " + lock.validate(stamp));
          System.out.println("==> Reading done by task="+ Thread.currentThread().getName() );
      }catch(Exception exp){
       exp.printStackTrace();
      }
      finally {
          lock.unlock(stamp);
      }
  };
  
  return readTask;
 }
 
 
 public static void stop(ExecutorService executor) {
        try {
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("termination interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("killing non-finished tasks");
            }
            executor.shutdownNow();
        }
    }
 
}

Semaphores

Semaphore is used to control the number of concurrent threads that are using a resource.
The Rentrantlocks usually grant exclusive access to variables or resources, a semaphore is capable of maintaining whole sets of permits.
Its more like having tokens and once the token is used we need wait for the token to be released so that someone else can use it.

 Semaphores semphores = new Semaphores(5); 
 So only 5 threads can access a resource at a given time.

 Boolean permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
  ...
  ...
  if(permit){
   semaphore.release();
  }
This is useful  when you want to limit the amount concurrent access to certain parts of your application.
( Kind of opposite to CountDownLatch.)
package chap12Semaphores;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author twreddy
 *  Semaphores in action.
 ****/

public class TestLock {
 
 /*
  * The executor can potentially run 10 tasks concurrently but we use a semaphore of size 5, 
  * thus limiting concurrent access to 5
  */
 
 Semaphore semaphore = new Semaphore(5);
 
 public static void main(String[] args) {
  
  ExecutorService executor = Executors.newFixedThreadPool(10);
  TestLock testreadWrite = new TestLock();
  
  Runnable writeTask = testreadWrite.getWriteTask();
  
  /*
  executor.submit(writeTask);
  executor.submit(writeTask);
  executor.submit(writeTask);
  executor.submit(writeTask);
  executor.submit(writeTask);
  ...
  
  ...10 times 
  */
  
  //or in one line submit 10 tasks
  IntStream.range(0, 10).forEach(i->executor.submit(writeTask));
  
  stop(executor);
 }
 
 
 
 public Runnable getWriteTask(){
  
  Runnable longRunningTask = () -> {
      boolean permit = false;
      try {
       //Get a sempahore i can wait for 1 seconds..
          permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
          if (permit) {
              System.out.println("Semaphore acquired");
              sleep(5);
          } else {
              System.out.println("Could not acquire semaphore");
          }
      } catch (InterruptedException e) {
          throw new IllegalStateException(e);
      } finally {
          if (permit) {
              semaphore.release();
          }
      }
  };
  
  return longRunningTask;
 }
 
 
 
 public static void stop(ExecutorService executor) {
        try {
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("termination interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("killing non-finished tasks");
            }
            executor.shutdownNow();
        }
    }

 
 
 public static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
 
}

DEAD LOCK


- What is a dead lock ? When two threads are waiting on each other for a resource to be available,while each one as acquired lock on other resource.
i.e e.g: Say we have two lists (list1 and list2).
In one thread get lock on list1 and list2 in the order I specified.In second thread get lock on list2 and list1 .Both threads will never get locks on both objects due to different orders.
Dead lock in action.Here is the sample code.




package demo11deadlock;

import java.util.ArrayList;
import java.util.List;

/******
 * @author twreddy
 *
 * In one thread get lock on list1 and list2 in the order I specified.
 * In second thread get lock on list2 and list1 . 
 * Both thread will never get locks on both objects due to different orders.
 *****/
public class Processor {
  private List<Integer>  list1 = new ArrayList<Integer>();
  private List<Integer>  list2 = new ArrayList<Integer>();
  
   public void firstMethod() throws InterruptedException{
    System.out.println("Entering firstMethod . Thread= "+ Thread.currentThread().getName());
    synchronized (list1) {
     System.out.println("FirstMethod got lock on list1 ");
     Thread.sleep(3000);
     System.out.println("FirstMethod waiting for lock on list2 ? ZZZZZzzzzzzzz ");
     synchronized (list2) {
      System.out.println("Entering FirstMethod - Got lock on list2");
    for(int i=0; i < 10000;i++){
     list1.add(i%2);
     list2.add(i*2);
    }
   }
  }
  System.out.println(" F1-Exit");  
   }

   public void secondMethod() throws InterruptedException{
    System.out.println("Entering secondMethod . Thread= "+ Thread.currentThread().getName());
    synchronized (list2) {
     System.out.println("secondMethod Got lock on list2 ");
     Thread.sleep(3000);
     System.out.println("secondMethod waiting for lock on list1 ? ZZZZZzzzzzzzz ");
     synchronized (list1) {
     System.out.println("Entering secondMethod - Got lock on list1");
     for(int i=0; i < 10000;i++){
      list1.add(i%2);
      list2.add(i*2);
     }
    }
      
   }
    System.out.println(" F2- Exit");
   }
 
} 
Now lets start two threads such that each tries to acquire lock in different order.

package demo11deadlock;
/**
 * @author twreddy
 * Demonstrating dead lock condition.
 * We have two lists (list1 and list2). 
 * In one thread get lock on list1 and list2 in the order I specified.
 * In second thread get lock on list2 and list1 . 
 * Both threads will never get locks on both objects due to different orders.
 * 
 ****/
public class App {

 public static void main(String[] args) throws InterruptedException {
  final Processor process = new Processor();
  
  Thread t1 = new Thread(new Runnable(){

   @Override
   public void run() {
   
    try {
     process.firstMethod();
    } catch (InterruptedException e) {
    
     e.printStackTrace();
    }
   }
   
  });
  
  
  Thread t2 = new Thread(new Runnable(){

   @Override
   public void run() {
    
    try {
     process.secondMethod();
    } catch (InterruptedException e) {
    
     e.printStackTrace();
    }
   }
   
  });
  
  t1.start();
  t2.start();
  
  //Wait for threads to complete  
  t1.join();
  t2.join();
  System.out.println(" All set with threads.");
  
 }
 
} 

How to avoid deadlocks ? 

Here we will use "ReentrantLock" though you can achieve the same by wait(), notify() and synchornized keywords.
Preventing dead lock in the above case by using "ReentrantLock" by declaring lock per resource and getting all locks at once.

Ok lets start two threads and



package demo12avoiddeadlock;

/****
 * @author twreddy
 *  How to avoid dead locks by using RentrantLocks() and 
 *  acquiring locks on both objects at one go.
 ***/
public class App {

 public static void main(String[] args) throws InterruptedException {
  final Processor process = new Processor();
  
  Thread t1 = new Thread(new Runnable(){
   @Override
   public void run() {
    // TODO Auto-generated method stub
    try {
     process.firstMethod();
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   
  });
  
  Thread t2 = new Thread(new Runnable(){
   @Override
   public void run() {
    // TODO Auto-generated method stub
    try {
     process.secondMethod();
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   
  });
  
  t1.start();
  t2.start();
  
  t1.join();
  t2.join();
  
  process.printInfo();
  System.out.println(" all set ");
 }
}
If you look at getLocks() method we try to acquire both locks as one atomic operations if not we release the first lock we acquired.




package demo12avoiddeadlock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author TWreddy
 *  To avoid dead locks always acquire locks in same order
 *  and easy way is use Reentrant lock object 
 *  But Make sure you acquire in same order .
 *  So best  is to have a method to just Acquire locks and only when all the locks are acquired 
 *  We will proceed
 *  
 */
public class Processor {
  private List<Integer>  list1 = new ArrayList<Integer>();
  private List<Integer>  list2 = new ArrayList<Integer>();
  private Lock lock1 = new ReentrantLock();
  private Lock lock2 = new ReentrantLock();
  
  
  
  /**
  *  This is very basic. But waht after getting first lock the 
  *  second lock is acquired by other thread ?
  *  So this simple approach will not work.
  */
 private void getLocksSimple(){
   //this is very basic
   lock1.lock();
   lock2.lock();
  }
  
  
  /**
   * Check for locks periodically if not available right now..
   * Sleeps every 100 ms and tries again.
   * 
  * @throws InterruptedException
  */
 private void getLocks() throws InterruptedException{
  
   while(true){
    boolean firstLock = false;
    boolean secondLock = false;
    
    try{
     firstLock = lock1.tryLock();
     secondLock = lock2.tryLock();
    }
    finally{
     if(firstLock && secondLock){
      //OK got lock on both 
      return;
     }
     
     if(firstLock){
      //Release it as we did get locks on both 
      //if someone is waiting for this first lock only let them do some work
      lock1.unlock();
     }
     
     if(secondLock){
      //Release it as we did get locks on both 
      //if someone is waiting for this first lock only let them do some work
      lock2.unlock();
     }
    }
    
    //Else wait for 100 MS and try again 
    Thread.sleep(100);
   }
  }
  
  
  
  
  private void releaseLocks(){
   lock1.unlock();
   lock2.unlock();
  }
  
  
   public void firstMethod() throws InterruptedException{
   
    Thread.sleep(3000);
    Thread.currentThread().interrupt();
    try{
      getLocks();
      System.out.println("F1 got Locks");
      for(int i=0; i < 1000;i++){
     list1.add(i);
     list2.add(i);
    }
    }finally{
     releaseLocks();
     System.out.println(" F1 - release locks");
    }
   }
   
   
   public void secondMethod() throws InterruptedException{
   
    
     Thread.sleep(3000);
     try{
       getLocks();
        System.out.println(" F2- Got locks");
       for(int i=0; i < 1000;i++){
      list1.add(i);
      list2.add(i);
     }
     }finally{
      releaseLocks();
      System.out.println(" F2 - release locks");
     }
      
    System.out.println(" F2- Exit");
   }
 
   
   public void printInfo(){
    System.out.println("List1 ="+ list1.size());
    System.out.println("List2 ="+ list2.size());
   }
}

Producer/Consumer

Producer/Consumer example using synchronized keyword, wait() and notify() features. When thread calls wait it releases the current object lock (it keeps all locks from other objects) and than goes to WAITING state On that object to be available... A notify can inform the availability... and will continue rest of the code.. provide it wins the war against other waiting folks else will still wait More at :  http://stackoverflow.com/questions/13249835/java-does-wait-release-lock-from-synchronized-block

IMPORTANT:

  • wait(), notify() and notifyAll() must be called inside a synchronized method or block.
  • wait() method tells the current thread (thread which is executing code inside a synchronized method or block) to give up monitor and go to waiting state.
  • notify() method Wakes up a single thread that is waiting on this object's monitor.
  • notifyAll() method wakes up all the threads that called wait( ) on the same object.


Example:
In this example  this is what is happening

1. Producer- So when the list is empty or is not  full the producer will add data to list and will call notify(). Notify() Will wakeup consumer if its in wait() state.
2. Producer - When list is full Producer will call wait() on the list and waits until it gets the lock on the object (the list)
3. Consumer - When List is not empty , it reads data and will call notify() on the list. This will wakeup Producer thread if its in waiting state.
4. Consumer - When List is empty consumer can go to wait() state and will wakeup if Producer call notify().

Here is the code.  Lets start two threads and one calls produce() method and other consume() method.

package demo09producerConsumerWaitNotify;
/**
 * @author twreddy
 *
 **/
public class App {
 public static void main(String[] args)throws Exception {
  final Processor proc = new Processor();
  
  Thread t1 = new Thread(new Runnable() {
   
   @Override
   public void run() {
    while(true ){
     try {
      proc.produce();
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
    
   }
  });

  Thread t2 = new Thread(new Runnable() {
   @Override
   public void run() {
    while(true ){
     try {
      proc.consume();
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }
  });
  
  t1.start();
  t2.start();
 }
}

As you see in the below code we add data to list if it has enough room and we call notify() else we wait(). In consume() we just do the opposite.
As describe here http://stackoverflow.com/questions/13249835/java-does-wait-release-lock-from-synchronized-block When we call wait() we Release the lock and wait to be informed to try to acquire lock. When thread calls wait it releases the current object lock (it keeps all locks from other objects) and than goes to WAITING state on that object to be available... A notify can inform the availability and will continue rest of the code..provide it wins the war against other waiting folks else will still wait

package demo09producerConsumerWaitNotify;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @author twreddy
   Producer -  Add to list, notify(), if not wait().
   Consumer -  If empty wait() else read from list and notify(). 
*/
public class Processor{
 
 private List<Integer> numList = new ArrayList<Integer>();
 private static int MAX_SIZE = 5;
 
 public void produce() throws InterruptedException{
  Random random = new Random();
  
   
   synchronized (numList) {
    if(numList.size() < MAX_SIZE){
     numList.add(random.nextInt(100));
     System.out.println("PRODUCER-List Size ="+ numList.size());
     numList.notify();  
  //if we had said synchronized(this) then we can call notify() on "this" object directly.
    }else {
    System.out.println("PRODUCER-Waiting as Queue is full"+ numList.size());
     numList.wait();
    }
   }
   Thread.sleep(1000);
 }
 
 public void consume() throws InterruptedException{
   
   synchronized (numList) {
    if(numList.size() == 0){
   System.out.println("CONSUMER Waiting as there No data++++");
   //Which means Release the lock and wait to be informed to try to acquire lock 
     numList.wait();
    }else {
     Integer num = numList.remove(0);
    System.out.println("CONSUMER Read "+ num +" size="+ numList.size());
     numList.notify();
    }
   }
   Thread.sleep(5000);
   
  }
 
}

ArrayBlockingQueue

Producer/Consumer example using "ArrayBlockingQueue". 
  • ArrayBlockingQueue - Thread safe and can be accessed by multiple threads.
  • ArrayBlockingQueue infact uses RentrantLock inside it if you see the implementation.
  • If the Queue is full any write calls to Queue will be a blocking call.
  • If queue is empty any reads will be blocking call. 

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author twreddy
 *
 *  ArrayBlockingQueue - Thread safe and can be accessed by multiple threads.
 *  ArrayBlockingQueue infact uses RentrantLock inside it if you see the implementation.
 *  
 *  If the Queue is full any write calls to Queue will be a blocking call.
 *  If queue is empty any reads will be blocking call. 
 * 
 *  Easy to implement Producer and Consumer using "ArrayBlockingQueue"
 *  instead of wait()/notify()/notifyAll() 
 *  
 */
public class App {

 private static BlockingQueue<Integer> blkQueue = new ArrayBlockingQueue<Integer>(10);
 
 public static void main(String[] args) {
  
  Thread t1 = new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     producer();
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
    
   }
  });
  

  Thread t2 = new Thread(new Runnable() {
   @Override
   public void run() {
    try{
     consumer();
    }catch(Exception exp){
     exp.printStackTrace();
    }
   }
  });
  
  t1.start();
  t2.start();
  
  try {
   //Wait till t1 dies 
   t1.join();
   //Wait till t2 dies 
   t2.join();
  } catch (Exception e) {
   // TODO: handle exception
  }
 }
 
 
 private static void producer() throws InterruptedException  {
  Random random = new Random();
  while(true ){
   Thread.sleep(1000);
   blkQueue.put(random.nextInt(100));
   System.out.println(" Added Elemente. Current blk Size="+ blkQueue.size());
  }
 }
 
 private static void consumer() throws Exception{
  Random random = new Random();
  while(true){
   Thread.sleep(2000);
   Integer took = blkQueue.take();
   System.out.println(" took="+ took + " blk Size="+ blkQueue.size());
   
  }
 }
}

Interrupting a thread

We can interrupt a thread by calling Thread.currentThread().interrupt()
  • If the current thread is in sleep an interrupted Excpetion will be thrown
  • If current thread is running a flag will be set we can check that using "Thread.currentThread().isInteruppted()"
  • If the thread is blocked on IO  then the thread's interrupt status will be set, and the thread will receive a "java.nio.channels.ClosedByInterruptException"
 Here is the sample code to test this.
Comment or uncomment the below code to see CASE1 and CASE2 as explained in the code.

package demo13InterruptThread;
/**
 * @author twreddy
 * When we call Thread.currentThread().interrupt()
 * 
 */
public class App {
  public static void main(String[] args) throws Exception{
   
   Thread t1 = new Thread(){
    public void run(){
    
     while (true){
      
      /**
       * CASE1: This flag is set only if you are not in sleep() state.
       * Use case 1 if you don't have wait() or sleep() in run method.
       */
      /*
      if(Thread.currentThread().isInterrupted()){
       System.out.println("Thank god some one interrupted , now I can stop working.");
       return;
      }else{
       System.out.println(" Keep working.....");
      }
      */
      
      
      /*  
       * CASE2: If you are in sleep you will get interrupted Exception and you cant depend on the flag.
       *  Use case 2 thats is relying on interrupted exception when you have sleep() 
       *   or wait() as you cant use Thread.currentThread().isInterrupted()  flag
       *  
       * */
      try{
       System.out.println(" Keep working.....");
       Thread.sleep(100);
       
       /* Comment sleep and uncomment if you want to test wait() being interrupted. 
       synchronized (this) {
        this.wait(10);
       }
       */
       
      }catch(InterruptedException exp ){
     System.out.println("I was interrupted while I was in sleep I cant work any more :) . flag set ="+ Thread.currentThread().isInterrupted());
       break;
      }
     }
     
    }
   };
   
   
   t1.start();
   Thread.sleep(500);
   
   //Lets interuptt this thread to see if it can come out of its loop.
   t1.interrupt();
   
   t1.join();
   
  }
} 

AtomicInteger

Thread safe. Internally, the atomic classes make heavy use of compare-and-swap (CAS), an atomic instruction directly supported by most modern CPUs. Those instructions usually are much faster than synchronizing via locks. So my advice is to prefer atomic classes over locks in case you just have to change a single mutable variable concurrently.
Other data types to explore:
LongAdder  - for Long
LongAccumulator -  Instead of performing simple add operations the class LongAccumulator builds around a lambda  expression of type LongBinaryOperator as demonstrated in this code sample:

eg:
 LongBinaryOperator op = (x, y) -> 2 * x + y;
 LongAccumulator accumulator = new LongAccumulator(op, 1L);
 IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));





package chap13AutomiInteger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;

/**
 * @author twreddy
 *  AtomicInteger example.  
 */
public class TestAtomic {

 static AtomicInteger atomicInt = new AtomicInteger();
 public static void main(String[] args) {
  
  ExecutorService executor = Executors.newFixedThreadPool(10); 
  TestAtomic testAtm = new TestAtomic();
  Runnable task = testAtm.getIntOpTask();
  
  //Lets run the task 1000
  IntStream.range(0, 100).forEach(i->executor.submit(task));
  
   stop(executor);
   System.out.println(atomicInt);
 }

 private Runnable getIntOpTask(){
  
  Runnable task = () -> {
   try{
   //IntUnaryOperator updateFunction 
   //accepts a lambda expression in order to perform arbitrary arithmetic operations upon the integer:
     atomicInt.updateAndGet(n -> n + 2);
    TimeUnit.MILLISECONDS.sleep(100);
   }catch(Exception exp){
    exp.printStackTrace();
   }
  };
  
  return task;
 }

 public static void stop(ExecutorService executor) {
        try {
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("termination interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("killing non-finished tasks");
            }
            executor.shutdownNow();
        }
    }
 
}


ConcurrentHashMap

Similar to HashTable its Synchronized.ConcurrentHashMap uses multiple buckets to store data. This avoids read locks and greatly improves 
performance over a HashTable. Both are thread safe, but there are obvious performance wins with ConcurrentHashMap.
However when you read from a ConcurrentHashMap using get(), there are no locks,contrary to the HashTable for which all operations are simply synchronized. HashTable was released in old versions of Java whereas ConcurrentHashMap is a java 5+ thing.


package chap15ConcurrentMap;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;

public class TestConcurrentMap {

 
 public static void main(String[] args) {
 
  //ConcurrentMap
  ConcurrentHashMap<String, String> map = getMap();
  testNewMapFunctions(map);
  //Four CPU cores are available on my machine which results in a parallelism of three:
  System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3
  foreachParallelTest(map);
  testSearchParallel(map);
  serachByValue(map);
  testReduceMethod(map);
 }



 private static void testReduceMethod(ConcurrentHashMap<String, String> map) {
  //Reduce 
  String result = map.reduce(1,
       (key, value) -> {
           System.out.println("Transform: " + Thread.currentThread().getName());
           return key + "=" + value;
       },
       /* s1 is one key value pair from previous merge. Similarly s2.*/
       (s1, s2) -> {
           System.out.println("Reduce: " + Thread.currentThread().getName());
           //System.out.println(" s1="+s1);
           //System.out.println(" s2="+s2);
           return s1 + ", " + s2;
       });

   System.out.println("Reduced result : " + result);
 }

 
 
 private static void serachByValue(ConcurrentHashMap<String, String> map) {
  //Searching based on values of the keys 
  System.out.println("============== serachByValue parallel test  ===========");
  String result = map.searchValues(1, value -> {
      System.out.println(Thread.currentThread().getName());
      if (value.contains("Chandra")) {
          return value;
      }
      return null;
  });

  System.out.println("serachByValue: " + result);
 }

 private static void testSearchParallel(ConcurrentHashMap<String, String> map) {
  //SEARCH
  System.out.println("============== search parallel test  ===========");
  // 1 indicates parallelismThreshold  number of elements
  //needed for this operation to be executed in parallel.
  String result = map.search(1,(key, value) -> {
      System.out.println(Thread.currentThread().getName());
      if ("foo".equals(key)) {
          return value;
      }
      return null;
  });
  System.out.println("Result: " + result);
 }
 

 private static void foreachParallelTest(ConcurrentHashMap<String, String> map) {
  System.out.println("============== foreachParallelTest ===========");
  //Java 8 introduces three kinds of parallel operations: forEach, search and reduce
  //FOREACH
  //parallelismThreshold =1 here is the (estimated) number of elements
     //needed for this operation to be executed in parallel.
  map.forEach(1, (key, value) ->
     System.out.printf("key: %s; value: %s; thread: %s\n",
         key, value, Thread.currentThread().getName()));
 }

 /**
  * Som cool new methods you can use  on concurrent map/
  * @param map
  */
 private static void testNewMapFunctions(ConcurrentMap<String, String> map) {
  map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
  
  //putIfAbsent
  String value = map.putIfAbsent("c3", "p1");
  System.out.println(value);    // p0
  
  //PutIfAbsent
  String value1 = map.putIfAbsent("Rama", "chandra");
  System.out.println(value1);    // "null"
  
  //GetOrDefault
  String value2 = map.getOrDefault("hi", "there");
  System.out.println(value2);    // there
  
  map.replaceAll((key, val) -> "Rama".equals(key) ? "Chandraaaaaaaaaa" : val);
  System.out.println(map.get("Rama"));    // Chandraaaaaaaaaa
  
  //append someting 
  map.compute("foo", (key, val1) -> val1 + val1);
  System.out.println(map.get("foo"));   // barbar
  
  //Merge accepts a key, the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values:
  map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
  System.out.println(map.get("foo"));   // boo was barbar
 }
 
 public static ConcurrentHashMap<String, String> getMap(){
  ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
  map.put("foo", "bar");
  map.put("han", "solo");
  map.put("r2", "d2");
  map.put("c3", "p0");
  return map;
 }
}


More References:

Leave your comments if you have any suggestions, corrections etc..

1 comment:

  1. java
    Very good information. Its very useful for me. We need learn from real time examples and for this we choose good training institute, we need to learn from experts . So we make use of demo classes . Recently we tried java demo class of Apponix Technologies.
    https://www.apponix.com/Java-Institute/Java-Training-Institute-in-Bangalore.html

    ReplyDelete