Sunday, July 06, 2014

Using an MDB (Message-Driven Bean) - JBoss EAP 6.0 or JBOSS 7


The following topic are covered in this article.
  • MDBs are slowly dying in JBoss EAP
  • MDB in JBoss EAP will not reconnect to remote JMS provider after restarting  the message server
  • How to configure MDB's in JBOSS EAP 6 ?
  • How to deploy MDBS in Jboss EAP 6 ?
  • Configuring Message-Driven Beans
  • JBoss MDB 3.0 - EJB 3 - JBoss application server tutorial
  • Working MDB example in JBoss 7.0.1
  • Helloword Using an MDB (Message-Driven Bean) - JBoss EAP 6.0 or JBOSS 7
  • Java code to send messages to MDB or Sending messages to an MDB


Here is some some sample MDB's that I created to  tests on EAP 6  or JBOSS 7 
During the testing process we noticed that sometime MDBS would loose connection with message server and would never reconnect again.
Also whenever we re start message server all mdb's would loose communication with message server and would never re-establish connection..
To fix we need to add few additional properties as shown below and there is brief description as why we need each of that property.


@ActivationConfigProperty(propertyName = "clientFailureCheckPeriod", propertyValue = "600000"),
@ActivationConfigProperty(propertyName = "connectionTTL", propertyValue = "-1")

  Inspect the network for any issues that might cause clients to become disconnected from the server. 
   Consider adjusting the <connection-ttl> (default is 60000ms) and <check-period> (default is 30000ms) 
   on the appropriate JMS connection factory. If network latency is the problem then increasing 
   these values will help mitigate this issue.

   To be clear, the "clientFailureCheckPeriod" can be any number > 0. 
   The value "600000" equates to 10 minutes which seems reasonable (the default is 30 seconds).
   The connection-ttl determines how long the server will keep a connection alive in the 
       absence of any data arriving from the client. The client will automatically send "ping" 
        packets periodically to prevent the server from closing it down. If the server doesn't receive 
any packets on a connection for the connection TTL time,
then it will automatically close all the sessions on the server that relate to that connection.


@ActivationConfigProperty(propertyName="reconnectAttempts", propertyValue="-1"),
@ActivationConfigProperty(propertyName="reconnectInterval", propertyValue="-1")

Please note that one can also adjust the "reconnectInterval" which  controls how long to wait 
between reconnection attempts (the value is in  seconds and is 10 by default).
The "reconnectAttempts" and "reconnectInterval" can also be set through the MDB's 
ejb-jar.xml or globally for all MDBs through ejb3-interceptors-aop.xml 
as demonstrated with the "maxSession" property in Configure maximum JMS sessions for EJB3 MDB in JBoss EAP

Lets show this with an example. The assumption here is you have a queue manager configured in your jboss stand-alone.xml and you have queue named 
"RAMA.TEST.QUEUE".

So lets call your queue manager name as "LOCAL_QUEUE_MANAGER"


NOTE: see this link for more on jms configuration using hornet queue as provider for JBOSS eap 6. 
http://www.mastertheboss.com/jboss-jms/jboss-jms-configuration. 
So lets get into how we can create an MDB and will use a sample test client.

So basically this queue manager will have an entry in your stand-alone.xml something similar to this 


Connection Factory looks like :
==============================
NOTE: Here netty-remote is pointing to one of our remote message servers.

 <jms-connection-factories>
 ....
 .....
    <pooled-connection-factory name="LOCAL_QUEUE_MANAGER">
<user>rama</user>
<password>tests123</password>
<transaction mode="xa"/>
<connectors>
   <connector-ref connector-name="netty-remote"/>
</connectors>
<entries>
   <entry name="java:/LOCAL_QUEUE_MANAGER"/>
</entries>
    </pooled-connection-factory>
 </jms-connection-factories>

Queue config looks like  below in your stand-alone.xml or domain.xml:
==================================================================

<jms-destinations>
    <jms-queue name="RAMA.TEST.QUEUE">
<entry name="java:jboss/exported/jms/queue/RAMA.TEST.QUEUE"/>
<entry name="jms/queue/RAMA.TEST.QUEUE"/>
<durable>true</durable>
    </jms-queue>
......
.......
 </jms-destinations>

Now create and EJB project in your eclipse or Intellij and use the below code.

/**
 *  My test mdb
 */
package com.rama.tst.mdb;
import java.util.Date;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.jboss.ejb3.annotation.ResourceAdapter;
@TransactionManagement(TransactionManagementType.CONTAINER)
@ResourceAdapter("LOCAL_QUEUE_MANAGER")
@MessageDriven(messageListenerInterface = MessageListener.class, activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "RAMA.TEST.QUEUE"),
@ActivationConfigProperty(propertyName = "clientFailureCheckPeriod", propertyValue = "600000"),
@ActivationConfigProperty(propertyName = "connectionTTL", propertyValue = "-1"),
//************************* RECONNECT PARAMETERS  **************************@ActivationConfigProperty(propertyName="reconnectAttempts", propertyValue="-1"),
@ActivationConfigProperty(propertyName="reconnectInterval", propertyValue="-1")})
public class TestMDB extends TestAbstractMDB {
private static final long serialVersionUID = 1231L;
public TestMDB(){
System.out.println(" ******************** New MDB obj was created at "+ new Date());
System.out.println(" ********************  New MDB obj "+ this.toString());
}
public void consumeMessage(Message message) {
String messageStr = "";
try {
messageStr = JMSUtil.getMessageAsString(message);
System.out.println("====== We received your message at "+ new Date() + " ======" );


} catch (Throwable t) {
t.printStackTrace();
} finally {
}
}
}
where TestAbstractMDB is the super class which has some basic methods.

package com.rama.tst.mdb;
import javax.ejb.EJBException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/** * * @author twreddy Parent with some common methods dumped here. * */ public abstract class AbstractMDB implements MessageDrivenBean,MessageListener { private static final Logger logger = LogManager.getLogger(AbstractMDB.class); public static final long serialVersionUID = 123; private MessageDrivenContext context; /** * @return the context */ public MessageDrivenContext getContext() { return this.context; } /** * @param context * the context */ public void setContext(MessageDrivenContext context) { this.context = context; } public void ejbCreate() { } public void setMessageDrivenContext(MessageDrivenContext newContext) throws EJBException { this.context = newContext; }
public void onMessage(Message message) { this.logMessageProperties(message); try { this.consumeMessage(message); } catch (Exception e) { } } /** * Logs JMS Properties extracted from the JMS Message * @param message the JMS message */ protected final void logMessageProperties(Message message) { try { this.logger.debug("Received Message:"); this.logger.debug("JMSMessageID :" + message.getJMSMessageID()); this.logger.debug("JMSXDeliveryCount:" + message.getIntProperty("JMSXDeliveryCount"));//Optional property this.logger.debug("CorrelationID :" + message.getJMSCorrelationID()); this.logger.debug("JMSDeliveryMode :" + message.getJMSDeliveryMode()); this.logger.debug("JMSDestination :" + message.getJMSDestination()); this.logger.debug("JMSExpiration :" + message.getJMSExpiration()); this.logger.debug("JMSPriority :" + message.getJMSPriority()); this.logger.debug("JMSRedelivered :" + message.getJMSRedelivered()); this.logger.debug("JMSReplyTo :" + message.getJMSReplyTo()); this.logger.debug("JMSTimestamp :" + message.getJMSTimestamp()); this.logger.debug("JMSType :" + message.getJMSType()); } catch(JMSException e){ this.logger.info("An error occurred while extracting JMS Properties from the message", e); } } /** * ejbRemove */ public void ejbRemove() { logger.debug("AbstractMDB: inside MDB ejbRemove" + this); } public abstract void consumeMessage(Message message); /** * Reads correlattion from jms message if null checks for message id * * @param message * @return * @throws JMSException */ public String getCorrelationId(Message message) throws JMSException { String correlationId = message.getJMSCorrelationID(); logger.debug("correlationId =" + correlationId); if ((correlationId == null) || (correlationId.trim().length() == 0)) { logger.debug("correlationId is null. Stamping incoming message id as correlation id.="+ correlationId); correlationId = message.getJMSMessageID(); } logger.debug("correlationId =" + correlationId); return correlationId; } }
And here is the Util class to extract messages as String

//And here is the Util class to extract messages as String package com.rama.tst.mdb; //Java SE dependencies import java.io.ByteArrayOutputStream; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.StreamMessage; import javax.jms.TextMessage; /** * Provides Utility methods for JMS Messages. */ public class JMSUtil { private static final int BYTE_BUFFER_SIZE = 512; /** * Given a JMS <tt>BytesMessage</tt>, it reads the data and returns it * as a String. * @param bytesMsg the bytes message * @return the message as <code>String</code> */ public static String getBytesMessageAsString(BytesMessage bytesMsg) throws JMSException{ ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] byteBuffer = new byte[JMSUtil.BYTE_BUFFER_SIZE]; // Read in the bytes int bytesRead = 0; while ( (bytesRead = bytesMsg.readBytes(byteBuffer)) != -1){ bos.write(byteBuffer, 0, bytesRead); //Reset byteBuffer for(int i=0; i<bytesRead; i++){ byteBuffer[i] = 0; } } return bos.toString(); } /** * Given a JMS <tt>Message</tt>, it extracts the message and returns it as String. * * @param message the jms message * @return the jms message as String * * @throws IllegalArgumentException if an invalid argument is passed * @throws Exception if an error occurs while processing */ public static String getMessageAsString(Message message)throws Exception{ if (message == null){ throw new IllegalArgumentException("Message cannot be null"); } String msgStr = null; if (message instanceof StreamMessage) { StreamMessage streamMsg = (StreamMessage) message; msgStr = streamMsg.readString(); } else if (message instanceof TextMessage) { TextMessage textMsg = (TextMessage) message; msgStr = textMsg.getText(); } else if (message instanceof BytesMessage) { BytesMessage bytesMsg = (BytesMessage) message; msgStr = JMSUtil.getBytesMessageAsString(bytesMsg); } else { throw new Exception("Unsupported message type :"+ message.getClass().getName()); } return msgStr; } }

Client to send messages to Hornet Queue or Send messages to JMS Queue or Send messages to MDB.


package com.rama.test.message; import java.io.File; import java.util.Date; import java.util.Properties; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import org.apache.commons.io.FileUtils; import com.rama.tst.mdb.JMSUtil; /* A simple test client to send, delete or read messages. */ public class TestMDB { String fileName="C:/temp/message.txt"; public static String queueName = "jms/queue/RAMA.TEST.QUEUE"; public static String deleteFromQueueName = "jms/queue/RAMA.TEST.QUEUE"; public static String readFromQueueName = "jms/queue/RAMA.TEST.QUEUE"; public static String sendQueueName = "jms/queue/RAMA.TEST.QUEUE"; public static String connectionFactoryName = "jms/RemoteConnectionFactory"; public static String messageQueueUserId="rama"; public static String messageQueuePassword="tests123!"; private static final int timeout = 10000; public static void main(String[] args) throws Exception{ String message = getMessage(); //deleteAllMessagesOnQueues(); for(int i=0;i <= 1; i++){ sendMessgToMdb(message); Thread.sleep(100L); } //readMessgFromQueue(); //readAllMessgFromQueue(); //sendBulkMessages(); //System.out.println("=======Finished========"); } public static InitialContext getInitialContext()throws Exception{ Properties props = new Properties(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); props.put(Context.PROVIDER_URL, "remote://rama-remote-desktop:4447"); props.put(Context.SECURITY_PRINCIPAL,messageQueueUserId); props.put(Context.SECURITY_CREDENTIALS,messageQueuePassword); InitialContext ic = new InitialContext(props); return ic; } public static void sendBulkMessages() throws Exception { QueueSession session = null; QueueConnection connection = null; QueueConnectionFactory queueConnFactory = null; InitialContext ic = null; try{ ic = getInitialContext(); System.out.println(ic.getEnvironment()); queueConnFactory =(QueueConnectionFactory) ic.lookup(connectionFactoryName); Queue queue = (Queue) ic.lookup(sendQueueName); System.out.println(queueConnFactory); connection = queueConnFactory.createQueueConnection(messageQueueUserId, messageQueuePassword); session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); File files[] = (new File("C:/Temp/")).listFiles(); int count=0; for(File fileName:files){ String fileStr = FileUtils.readFileToString(fileName); count++; TextMessage message = session.createTextMessage(); message.setText(fileStr); QueueSender sender = session.createSender(queue);; sender.send(message); System.out.println("Processing File:"+ fileName.getAbsolutePath()); } System.out.println("Total="+ count); }catch(Exception e){ e.printStackTrace(); } finally { session.close(); connection.close(); ic.close(); } System.out.println("Sent message"); } public static void sendMessgToMdb(String text) throws Exception { QueueSession session = null; QueueConnection connection = null; QueueConnectionFactory queueConnFactory = null; InitialContext ic = null; try{ ic = getInitialContext(); System.out.println(ic.getEnvironment()); queueConnFactory =(QueueConnectionFactory) ic.lookup(connectionFactoryName); Queue queue = (Queue) ic.lookup(sendQueueName); //We added same password for both local and Dev. System.out.println(queueConnFactory); System.out.println(queue); connection = queueConnFactory.createQueueConnection(messageQueueUserId, messageQueuePassword); session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); TextMessage message = session.createTextMessage(); message.setText(text); final QueueSender sender = session.createSender(queue); sender.send(message); }catch(Exception e){ e.printStackTrace(); } finally { session.close(); connection.close(); ic.close(); } System.out.println("Sent message"); } /* Read multiple messagea and dump it to a file. */ public static void readAllMessgFromQueue() throws Exception { Queue queue = (Queue) getInitialContext().lookup(readFromQueueName); QueueConnectionFactory factory =(QueueConnectionFactory) getInitialContext().lookup(connectionFactoryName); QueueConnection connection = factory.createQueueConnection(messageQueueUserId, messageQueuePassword); QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); QueueReceiver receiver = session.createReceiver(queue); connection.start(); while(true){ Message message = receiver.receive(timeout); if(message==null){ //throw new Exception("Timeout, no mesages returned on response queue."); break; } String payLoad = JMSUtil.getMessageAsString(message); logMessageProperties(message); System.out.println("CorrelationId:"+message.getJMSCorrelationID()); System.out.println("MessageId:"+message.getJMSMessageID()); System.out.println("Recieved message : " + payLoad); String fileName = message.getJMSMessageID().replace("ID:","")+".xml"; try{ String fullFileName = "C:/Temp/"+fileName; FileUtils.writeStringToFile(new File(fullFileName),payLoad); System.out.println(" File written to :"+ fullFileName); }catch(Exception e){ e.printStackTrace(); } } session.close(); connection.close(); System.out.println("=== Done ===="); } public static String readMessgFromQueue() throws Exception { Queue queue = (Queue) getInitialContext().lookup(readFromQueueName); QueueConnectionFactory factory =(QueueConnectionFactory) getInitialContext().lookup(connectionFactoryName); QueueConnection connection = factory.createQueueConnection(messageQueueUserId, messageQueuePassword); QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); QueueReceiver receiver = session.createReceiver(queue); connection.start(); Message message = receiver.receive(timeout); if(message==null){ throw new Exception("Timeout, no mesages returned on response queue."); } logMessageProperties(message); session.close(); connection.close(); System.out.println("CorrelationId:"+message.getJMSCorrelationID()); System.out.println("MessageId:"+message.getJMSMessageID()); System.out.println("Recieved message : " + JMSUtil.getMessageAsString(message)); return JMSUtil.getMessageAsString(message); } private static void deleteAllMessagesOnQueues() throws Exception { QueueConnectionFactory factory =(QueueConnectionFactory) getInitialContext().lookup(connectionFactoryName); QueueConnection connection = factory.createQueueConnection(messageQueueUserId, messageQueuePassword); QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //Queue queue = (Queue) getInitialContext().lookup(respQueueName); Queue queue = session.createQueue(deleteFromQueueName); QueueReceiver receiver = session.createReceiver(queue); connection.start(); int i = 0; while(true){ Message message = receiver.receive(timeout); if(message == null){ break; }else{ //System.out.println("Deleting message:"+ JMSUtil.getMessageAsString(message)); System.out.println("Deleting message:"+ i); } i++; } System.out.println(i + " messages deleted from:"+ deleteFromQueueName); session.close(); connection.close(); } public static String getMessage() throws RuntimeException{ try { return FileUtils.readFileToString(new File(fileName)); }catch(Exception e){ e.printStackTrace(); throw new RuntimeException(e); } } }

Hope this above sample code helps to setup an MDB and test with the client.


1 comment: