JMS-Based Message-Driven Beans




JMS-Based Message-Driven Beans

Message-driven beans (MDBs) are stateless, server-side, transaction-aware components for processing asynchronous messages delivered via JMS. While a message-driven bean is responsible for processing messages, its container manages the component's environment, including transactions, security, resources, concurrency, and message acknowledgment. It's particularly important to note that the container manages concurrency. The thread safety provided by the container gives MDBs a significant advantage over traditional JMS clients, which must be custom built to manage resources, transactions, and security in a multithreaded environment. An MDB can process hundreds of JMS messages concurrently because numerous instances of the MDB can execute concurrently in the container.

A message-driven bean is a complete enterprise bean, just like a session or entity bean, but there are some important differences. While a message-driven bean has a bean class, it does not have a remote or local business interface. These interfaces are absent because the message-driven bean responds only to asynchronous messages.

The ReservationProcessor EJB

The ReservationProcessor EJB is a message-driven bean that receives JMS messages notifying it of new reservations. The ReservationProcessor EJB is an automated version of the TravelAgent EJB that processes reservations sent via JMS. These messages might come from another application in the enterprise or from an application in some other organizationperhaps another travel agent. When the ReservationProcessor EJB receives a message, it creates a new Reservation EJB (adding it to the database), processes the payment using the ProcessPayment EJB, and sends out a ticket. This process is illustrated in Figure.

The ReservationProcessor EJB processing reservations


The ReservationProcessorBean Class

Here is a partial definition of the ReservationProcessorBean class. Some methods are left empty; they will be filled in later. Notice that the onMessage( ) method contains the business logic; it is similar to the business logic developed in the bookPassage( ) method of the TravelAgent EJB in Chapter 11. Here's the code:

package com.titan.reservationprocessor;

import javax.jms.*;
import com.titan.domain.*;
import com.titan.processpayment.*;
import com.titan.travelagent.*;
import java.util.Date;
import javax.ejb.*;
import javax.annotation.*;
import javax.persistence.*;

@MessageDriven(activationConfig={
                @ActivationConfigProperty(
                     propertyName="destinationType",
                     propertyValue="javax.jms.Queue"),
                @ActivationConfigProperty(
                     propertyName="messageSelector",
                     propertyValue="MessageFormat = 'Version 3.4'"),
                @ActivationConfigProperty(
                     propertyName="acknowledgeMode",
                     propertyValue="Auto-acknowledge")})
public class ReservationProcessorBean implements javax.jms.MessageListener {
    @PersistenceContext(unitName="titanDB")
    private EntityManager em;

    @EJB
    private ProcessPaymentLocal process;

    @Resource(mappedName="ConnectionFactory")
    private ConnectionFactory connectionFactory;

    public void onMessage(Message message) {
        try {
            MapMessage reservationMsg = (MapMessage)message;

            int customerPk = reservationMsg.getInt("CustomerID");
            int cruisePk = reservationMsg.getInt("CruiseID");
            int cabinPk = reservationMsg.getInt("CabinID");

            double price = reservationMsg.getDouble("Price");

            // get the credit card
            Date expirationDate =
                new Date(reservationMsg.getLong("CreditCardExpDate"));
            String cardNumber = reservationMsg.getString("CreditCardNum");
            String cardType = reservationMsg.getString("CreditCardType");
            CreditCardDO card = new CreditCardDO(cardNumber,
                expirationDate, cardType);

            Customer customer = em.find(Customer.class, customerPk);
            Cruise cruise = em.find(Cruise.class, cruisePk);
            Cabin cabin = em.find(Cabin.class, cabinPk);

            Reservation reservation = new Reservation(
                          customer, cruise, cabin, price, new Date( ));
            em.persist(reservation);

            process.byCredit(customer, card, price);

            TicketDO ticket = new TicketDO(customer,cruise,cabin,price);

            deliverTicket(reservationMsg, ticket);
        } catch(Exception e) {
            throw new EJBException(e);
        }
    }

    public void deliverTicket(MapMessage reservationMsg, TicketDO ticket) {

        // send it to the proper destination
    }

}

MessageDrivenContext

Message-driven beans also have a context object that is similar in functionality to that of the javax.ejb.SessionContext described in Chapter 11. This object may be injected using the @javax.annotation.Resource annotation:

@Resource MessageDrivenContext context;

The MessageDrivenContext simply extends the EJBContext ; it does not add any new methods. The EJBContext is defined as follows:

package javax.ejb;
public interface EJBContext {

    // transaction methods
    public javax.transaction.UserTransaction getUserTransaction( )
        throws java.lang.IllegalStateException;
    public boolean getRollbackOnly( ) throws java.lang.IllegalStateException;
    public void setRollbackOnly( ) throws java.lang.IllegalStateException;

    // EJB home methods
    public EJBHome getEJBHome( );
    public EJBLocalHome getEJBLocalHome( );

    // security methods
    public java.security.Principal getCallerPrincipal( );
    public boolean isCallerInRole(java.lang.String roleName);
    // deprecated methods
    public java.security.Identity getCallerIdentity( );
    public boolean isCallerInRole(java.security.Identity role);
    public java.util.Properties getEnvironment( );

}

Only the transactional methods that MessageDrivenContext inherits from EJBContext are available to message-driven beans. The home methodsgetEJBHome( ) and getEJBLocalHome( )tHRow a RuntimeException if invoked, because MDBs do not have home interfaces or EJB home objects. The security methodsgetCallerPrincipal( ) and isCallerInRole( )also throw a RuntimeException if invoked on a MessageDrivenContext. When an MDB services a JMS message, there is no "caller," so there is no security context to be obtained from the caller. Remember that JMS is asynchronous and doesn't propagate the sender's security context to the receiverthat wouldn't make sense, since senders and receivers tend to operate in different environments.

MDBs usually execute in a container-initiated or bean-initiated transaction, so the transaction methods allow the MDB to manage its context. The transaction context is not propagated from the JMS sender; it is either initiated by the container or initiated by the bean explicitly using javax.jta.UserTransaction . The transaction methods in EJBContext are explained in more detail in Chapter 16.

Message-driven beans also have access to their own JNDI ENCs, which provide the MDB instances access to environment entries, other enterprise beans, and resources. For example, the ReservationProcessor EJB takes advantage of the ENC to obtain references to the Titan EntityManager, the ProcessPayment EJB, and a JMS ConnectionFactory and Queue for sending out tickets.

MessageListener interface

MDBs usually implement the javax.jms.MessageListener interface, which defines the onMessage( ) method. This method processes the JMS messages received by a bean.

package javax.jms;
public interface MessageListener {
    public void onMessage(Message message);
}

Although MDBs usually implement this interface, we will see later in this chapter that MDBs can integrate with other messaging systems that define a different interface contract.

Taskflow and integration for B2B: onMessage( )

The onMessage( ) method is where all the business logic goes. As messages arrive, the container passes them to the MDB via the onMessage( ) method. When the method returns, the MDB is ready to process a new message. In the ReservationProcessor EJB, the onMessage( ) method extracts information about a reservation from a MapMessage and uses that information to create a reservation in the system:

public void onMessage(Message message) {
    try {
        MapMessage reservationMsg = (MapMessage)message;

        int customerPk = reservationMsg.getInt("CustomerID");
        int cruisePk = reservationMsg.getInt("CruiseID");
        int cabinPk = reservationMsg.getInt("CabinID");

        double price = reservationMsg.getDouble("Price");

        // get the credit card

        Date expirationDate =
            new Date(reservationMsg.getLong("CreditCardExpDate"));
        String cardNumber = reservationMsg.getString("CreditCardNum");
        String cardType = reservationMsg.setString("CreditCardType");
        CreditCardDO card = new CreditCardDO(cardNumber,
            expirationDate, cardType);

JMS is frequently used as an integration point for business-to-business (B2B) applications, so it's easy to imagine the reservation message coming from one of Titan's business partners (perhaps a third-party processor or branch travel agency).

The ReservationProcessor EJB needs to access the Customer, Cruise, and Cabin entities in order to process the reservation. The MapMessage contains the primary keys for these entities; the ReservationProcessor EJB uses the injected EntityManager to look up the entity beans:

public void onMessage(Message message) {
    ...
    Customer customer = em.find(Customer.class, customerPk);
    Cruise cruise = em.find(Cruise.class, cruisePk);
    Cabin cabin = em.find(Cabin.class, cabinPk);
    ...
}

Once the information is extracted from the MapMessage, it is used to create a reservation and process the payment. This is basically the same taskflow that was used by the TravelAgent EJB in Chapter 11. A Reservation entity is created that represents the reservation itself, and a ProcessPayment EJB is created to process the credit card payment:

Reservation reservation = new Reservation(
               customer, cruise, cabin, price, new Date( ));
em.persist(reservation);

process.byCredit(customer, card, price);

TicketDO ticket = new TicketDO(customer,cruise,cabin,price);

deliverTicket(reservationMsg, ticket);

Like a session bean, the MDB can access any other session bean and use that bean to complete a task. An MDB can manage a process and interact with other beans as well as resources. For example, it is commonplace for an MDB to use JDBC to access a database based on the contents of the message it is processing.

Sending messages from a message-driven bean

An MDB can also send messages using JMS. The deliverTicket( ) method sends the ticket information to a destination defined by the sending JMS client:

public void deliverTicket(MapMessage reservationMsg, TicketDO ticket)
    throws JMSException{

    Queue queue = (Queue)reservationMsg.getJMSReplyTo( );
    Connection connect = connectionFactory.createConnection( );
    Session session = connect.createSession(true,0);
    MessageProducer sender = session.createProducer(queue);
    ObjectMessage message = session.createObjectMessage( );
    message.setObject(ticket);
    sender.send(message);

    connect.close( );
}

Every message type has two parts: a message header and a message body (a.k.a. the payload). The message header contains routing information and may also have properties for message filtering and other attributes. One of these attributes may be JMSReplyTo . The message's sender may set the JMSReplyTo attribute to any destination accessible to its JMS provider. In the case of the reservation message, the sender set the JMSReplyTo attribute to the queue to which the resulting ticket should be sent. Another application can access this queue to read tickets and distribute them to customers or store the information in the sender's database.

You can also use the JMSReplyTo address to report business errors. For example, if the cabin is already reserved, the ReservationProcessor EJB might send an error message to the JMSReplyTo queue explaining that the reservation could not be processed. Including this type of error handling is left as an exercise for the reader.

@MessageDriven

MDBs are identified using the @javax.ejb.MessageDriven annotation or, alternatively, are described in an EJB deployment descriptor. An MDB can be deployed alone, but it's more often deployed with the other enterprise beans that it references. For example, the ReservationProcessor EJB uses the ProcessPayment EJB as well as the Titan EntityManager, so it is feasible to deploy all of these beans within the same Java EE deployment.

@ActivationConfigProperty

We'll see later that because MDBs can receive messages from arbitrary messaging providers, the configuration must be very flexible to be able to describe the proprietary properties that different providers will have. JCA-based MDBs don't necessarily use JMS as the message service, so this requirement is very important. To facilitate this, the @MessageDriven.activationConfig( ) attribute takes an array of @ActivationConfigProperty annotations. These annotations are simply a set of name/value pairs that describe the configuration of your MDB.

@MessageDriven(activationConfig={
                @ActivationConfigProperty(
                     propertyName="destinationType", 

                     propertyValue="javax.jms.Queue"),
                @ActivationConfigProperty(
                     propertyName="messageSelector", 

                     propertyValue="MessageFormat = 'Version 3.4'"),
                @ActivationConfigProperty(
                     propertyName="acknowledgeMode", 

                     propertyValue="Auto-acknowledge")})
public class ReservationProcessorBean implements javax.jms.MessageListener {
  ...
}

The property names and values used in the activationConfig( ) attribute to describe the messaging service vary depending on the type of message service used, but EJB 3.0 defines a set of fixed properties for JMS-based message-driven beans. These properties are acknowledgeMode, messageSelector, destinationType, and subscriptionDurability .

Message selector

An MDB can declare a message selector . Message selectors allow an MDB to be more selective about the messages it receives from a particular topic or queue. Message selectors use Message properties as criteria in conditional expressions.[*] These conditional expressions use Boolean logic to declare which messages should be delivered. A message selector is declared using the standard property name, messageSelector, in an activation configuration element:

[*] Message selectors are also based on message headers, which are outside the scope of this chapter.

@ActivationConfigProperty(
               propertyName="messageSelector",
               propertyValue="MessageFormat = 'Version 3.4'"),

Message selectors are based on message properties. Message properties are additional headers that can be assigned to a message; they allow vendors and developers to attach information to a message that isn't part of the message's body. The Message interface provides several methods for reading and writing properties. Properties can have a String value or one of several primitive values (boolean, byte, short, int, long, float, double). The naming of properties, together with their values and conversion rules, is strictly defined by JMS.

The ReservationProcessor EJB uses a message selector filter to select messages of a specific format. In this case, the format is "Version 3.4"; this is a string that Titan uses to identify messages of type MapMessage that contain the name values CustomerID, CruiseID, CabinID, CreditCard, and Price . In other words, adding a MessageFormat to each reservation message allows us to write MDBs that are designed to process different kinds of reservation messages. If a new business partner needs to use a different type of Message object, Titan would use a new message version and an MDB to process it.

Here's how a JMS producer would go about setting a MessageFormat property on a Message:

Message message = session.createMapMessage( );
message.setStringProperty("MessageFormat","Version 3.4");

// set the reservation named values

sender.send(message);

The message selectors are based on a subset of the SQL-92 conditional expression syntax that is used in the WHERE clauses of SQL statements. They can become fairly complex, including the use of literal values, Boolean expressions, unary operators, and so on.

Acknowledge mode

A JMS acknowledgment means that the JMS client notifies the JMS provider (message router) when a message is received. In EJB, it's the MDB container's responsibility to send an acknowledgment when it receives a message. Acknowledging a message tells the JMS provider that an MDB container has received and processed the message. Without an acknowledgment, the JMS provider does not know whether the MDB container has received the message, and unwanted redeliveries can cause problems. For example, once we have processed a reservation message using the ReservationProcessor EJB, we don't want to receive the same message again.

The acknowledgment mode is set using the standard acknowledgeMode activation configuration property, as shown in the following code snippet:

@ActivationConfigProperty(
           propertyName="acknowledgeMode",
           propertyValue="Auto-acknowledge 
")

Two values can be specified for acknowledgment mode: Auto-acknowledge and Dups-ok-acknowledge . Auto-acknowledge tells the container that it should send an acknowledgment to the JMS provider soon after the message is given to an MDB instance to process. Dups-ok-acknowledge tells the container that it doesn't have to send the acknowledgment immediately; anytime after the message is given to the MDB instance will be fine. With Dups-ok-acknowledge , it's possible for the MDB container to delay acknowledgment for so long that the JMS provider assumes the message was not received and sends a "duplicate" message. Obviously, with Dups-ok-acknowledge, your MDBs must be able to handle duplicate messages correctly.

Auto-acknowledge avoids duplicate messages because the acknowledgment is sent immediately. Therefore, the JMS provider won't send a duplicate. Most MDBs use Auto-acknowledge to avoid processing the same message twice. Dups-ok-acknowledge exists because it can allow a JMS provider to optimize its use of the network. In practice, though, the overhead of an acknowledgment is so small, and the frequency of communication between the MDB container and the JMS provider is so high, that Dups-ok-acknowledge doesn't have a big impact on performance.

Having said all of this, the acknowledgment mode is ignored most of the timein fact, it is ignored unless the MDB executes with bean-managed transactions, or with the container-managed transaction attribute NotSupported (see Chapter 16). In all other cases, transactions are managed by the container, and acknowledgment takes place within the context of the transaction. If the transaction succeeds, the message is acknowledged. If the transaction fails, the message is not acknowledged. When using container-managed transactions with a Required transaction attribute, the acknowledgment mode is usually not specified; however, it is included in the deployment descriptor for the sake of discussion.

Subscription durability

When a JMS-based MDB uses a javax.jms.Topic , the deployment descriptor must declare whether the subscription is Durable or NonDurable . A Durable subscription outlasts an MDB container's connection to the JMS provider, so if the EJB server suffers a partial failure, shuts down, or otherwise disconnects from the JMS provider, the messages that it would have received are not lost. The provider stores any messages that are delivered while the container is disconnected; the messages are delivered to the container (and from there to the MDB) when the container reconnects. This behavior is commonly referred to as store-and-forward messaging. Durable MDBs are tolerant of disconnections, whether intentional or the result of a partial failure.

If the subscription is NonDurable, any messages the bean would have received while it was disconnected are lost. Developers use NonDurable subscriptions when it is not critical for all messages to be processed. Using a NonDurable subscription improves the performance of the JMS provider but significantly reduces the reliability of the MDBs.

@ActivateConfigProperty(
      propertyName="subscriptionDurability",
      propertyValue="Durable")

When the destination type is javax.jms.Queue , as is the case in the ReservationProcessor EJB, durability is not a factor because of the nature of queue-based messaging systems. With a queue, messages may be consumed only once and they remain in the queue until they are distributed to one of the queue's listeners.

The XML Deployment Descriptor

Here is a deployment descriptor that provides a complete annotation-alternative definition of the ReservationProcessor EJB:

<?xml version="1.0"?>
<ejb-jar
       xmlns="http://java.sun.com/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
                           http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"
       version="3.0">
   <enterprise-beans>
      <message-driven>
         <ejb-name>ReservationProcessorBean</ejb-name>
         <ejb-class>
             com.titan.reservationprocessor.ReservationProcessorBean
         </ejb-class>
         <messaging-type>javax.jms.MessageListener</messaging-type>
         <transaction-type>Container</transaction-type>
         <message-destination-type>
             javax.jms.Queue
         </message-destination-type>
         <activation-config>
             <activation-property>
                 <activation-config-property-name>destinationType
                 </activation-config-property-name>
                 <activation-config-property-value>javax.jms.Queue
                 </activation-config-property-value>
             <activation-property>
             <activation-property>
                 <activation-config-property-name>messageSelector
                 </activation-config-property-name>
                 <activation-config-property-value>MessageFormat = 'Version 3.4'
                 </activation-config-property-value>
             <activation-property>
             <activation-property>
                 <activation-config-property-name>acknowledgeMode
                 </activation-config-property-name>
                 <activation-config-property-value>Auto-acknowledge
                 </activation-config-property-value>
             <activation-property>
         </activation-config>
         <ejb-local-ref>
             <ejb-ref-name>ejb/PaymentProcessor</ejb-ref-name>
             <ejb-ref-type>Session</ejb-ref-type>
             <local>com.titan.processpayment.ProcessPaymentLocal</local>
             <injection-target>
                <injection-target-class>
                   com.titan.reservationprocessor.ReservationProcessorBean
                </injection-target-class>
                <injection-target-name>process</injection-target-name>
             </injection-target>
          </resource-ref>
          <persistence-context-ref>
             <persistence-context-ref-name>
persistence/titan
</persistence-context-ref-name>
             <persistence-unit-name>titan</persistence-unit-name>
                 <injection-target-class>
                    com.titan.reservationprocessor.ReservationProcessorBean
                 </injection-target-class>
                 <injection-target-name>em</injection-target-name>
             </injection-target>
          </env-entry>
         <resource-ref>
             <resource-ref-name>jms/ConnectionFactory</resource-ref-name>
             <resource-type>javax.jms.ConnectionFactory</res-type>
             <res-auth>Container</res-auth>
             <mapped-name>ConnectionFactory</mapped-name>
             <injection-target>
                <injection-target-class>
                   com.titan. reservationprocessor. ReservationProcessorBean
                </injection-target-class>
                <injection-target-name>datasource</injection-target-name>
             </injection-target>
          </resource-ref>
       </message-driven>
   </enterprise-beans>
</ejb-jar>

In addition to the <activation-config> element that describes the messaging properties, there are the <messaging-type> and <message-destination-type> elements. An MDB is declared in a <message-driven> element within the <enterprise-beans> element, alongside <session> beans. Similar to <session> bean types, it defines an <ejb-name> and a <transaction-type> but does not define component interfaces (local or remote). MDBs do not have remote or local interfaces, so these definitions aren't needed.

The ReservationProcessor Clients

In order to test the ReservationProcessor EJB, we need to develop two new client applications: one to send reservation messages and the other to consume ticket messages produced by the ReservationProcessor EJB.

The reservation message producer

The JmsClient_ReservationProducer sends 100 reservation requests very quickly. The speed with which it sends these messages forces many containers to use multiple MDB instances to process them. The code for JmsClient_ReservationProducer looks like this:

import javax.jms.Message;
import javax.jms.MapMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import java.util.Date;

import com.titan.processpayment.CreditCardDO;

public class JmsClient_ReservationProducer {

    public static void main(String [] args) throws Exception {

        InitialContext jndiContext = getInitialContext( );

        ConnectionFactory factory = (ConnectionFactory)
            jndiContext.lookup("ConnectionFactoryNameGoesHere");

        Queue reservationQueue = (Queue)
            jndiContext.lookup("QueueNameGoesHere");

        Connection connect = factory.createConnection( );

        Session session =
            connect.createSession(false,Session.AUTO_ACKNOWLEDGE);

        MessageProducer sender = session.createProducer(reservationQueue);

        for(int i = 0; i < 100; i++){
            MapMessage message = session.createMapMessage( );
            message.setStringProperty("MessageFormat","Version 3.4");

            message.setInt("CruiseID",1);
            message.setInt("CustomerID",i%10);
            message.setInt("CabinID",i);
            message.setDouble("Price", (double)1000+i);

            // the card expires in about 30 days
            Date expirationDate = new Date(System.currentTimeMillis( )+43200000);
            message.setString("CreditCardNum", "923830283029");
            message.setLong("CreditCardExpDate", expirationDate.getTime( ));
            message.setString("CreditCardType", CreditCardDO.MASTER_CARD);
            sender.send(message);
        }
        connect.close( );
    }

    public static InitialContext getInitialContext( )
        throws JMSException {
        // create vendor-specific JNDI context here
    }
}

This code is very similar to the code we extended our TravelAgent EJB with earlier. It obtains a ConnectionFactory from JNDI and sets up the relevant JMS objects that will be used to send messages. It then creates 100 reservations and sends them to the JMS queue to be processed asynchronously.

The ticket message consumer

The JmsClient_TicketConsumer is designed to consume all the ticket messages delivered by ReservationProcessor EJB instances to the queue. It consumes the messages and prints out the descriptions:

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.MessageConsumer;
import javax.jms.JMSException;
import javax.naming.InitialContext;

import com.titan.travelagent.TicketDO;

public class JmsClient_TicketConsumer
    implements javax.jms.MessageListener {

    public static void main(String [] args) throws Exception {

        new JmsClient_TicketConsumer( );

        while(true){Thread.sleep(10000);}

    }

    public JmsClient_TicketConsumer( ) throws Exception {

        InitialContext jndiContext = getInitialContext( );

        ConnectionFactory factory = (ConnectionFactory)
            jndiContext.lookup("QueueFactoryNameGoesHere");
        Queue ticketQueue = (Queue)jndiContext.lookup("QueueNameGoesHere");
        Connection connect = factory.createConnection( );
        Session session =
            connect.createSession(false,Session.AUTO_ACKNOWLEDGE);
        MessageConsumer receiver = session.createConsumer(ticketQueue);

        receiver.setMessageListener(this);

        connect.start( );
    }

    public void onMessage(Message message) {
        try {
            ObjectMessage objMsg = (ObjectMessage)message;
            TicketDO ticket = (TicketDO)objMsg.getObject( );
            System.out.println("********************************");
            System.out.println(ticket);
            System.out.println("********************************");

        } catch(JMSException jmsE) {
            jmsE.printStackTrace( );
        }
    }
    public static InitialContext getInitialContext( ) throws JMSException {
        // create vendor-specific JNDI context here
    }
}

To make the ReservationProcessor EJB work with the two client applications, JmsClient_ReservationProducer and JmsClient_TicketConsumer , you must configure your EJB container's JMS provider so that it has two queues: one for reservation messages and another for ticket messages.