Wednesday, October 13, 2021

JMS: Deep dive in MQ systems and Roadmap to Kafka

JMS: Java Messaging Service


JMS is an example of Messaging Systems based on asynnchronous design pattern which is used by microservices to interact with each other to send data and events. For introductory part, please refer here.
JMS allow applications to create and send, receive and read messages in a shared environment.

JMS: Programming Model


Basic components to develop client based producer and consumer system are:
  • ConnectionFactory: Use the Java Naming and Directory Interface (JNDI) to find a ConnectionFactory object, or instantiate a ConnectionFactory object directly and set its attributes.
    Based on delivery model, client has separate instance for connection factory to create a connection to a provider:
    • Point to point :  QueueConnectionFactory
    • Publish/subscribe: TopicConnectionFactory
         The following snippet of code demonstrates how to use JNDI to find a connection factory object:

Context ctx = new InitialContext();
ConnectionFactory cf1 = (ConnectionFactory) ctx.lookup("jms/QueueConnectionFactory");
ConnectionFactory cf2 = (ConnectionFactory) ctx.lookup("/jms/TopicConnectionFactory");
        
        Alternatively, you can directly instantiate a connection factory as follows:

ConnectionFactory connFactory = new com.sun.messaging.ConnectionFactory();
QueueConnectionFactory connFactory = new com.sun.messaging.QueueConnectionFactory();
TopicConnectionFactory connFactory = new com.sun.messaging.TopicConnectionFactory();

  • Connection: Use the ConnectionFactory object to create a Connection object. This can be done as follows:
Connection connection = connFactory.createConnection();

        Note that you must close all connections you have created using the Connection.close() method.

  • Session: Use the Connection object to create one or more Session objects, which provide transactional context with which to group a set of sends and receives into an atomic unit of work. A session can be created as follows:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        The createSession() method takes two arguments:
    • the first (false in this case) means that the session is not transacted,
    • the second means that the session will automatically acknowledge messages when they have been received successfully.

  • Destination: A destination object is used by the client to specify the source of messages it consumes and the target of messages it produces.
    In the point-to-point messaging - destinations are known as queues,
    In the publish/subscribe model of messaging - destinations are known as topics.




  • Use JNDI to find Destination object(s), or instantiate one directly and configure it by setting its attributes. The following snippet of code demonstrates how to perform a JNDI lookup of a queue named jms/SomeQueue:

    Destination dest = (Queue) ctx.lookup("jms/SomeQueue");

    Or, you can directly instantiate and configure a destination object:

    Queue q = new com.sun.messaging.Queue("world");

  • MessageProducer: Use a Session and a Destination object to create the needed MessageProducer object, which are used for sending messages to a destination.
    Note that you can create a MessageProducer object without specifying a Destination object, but in that case a Destination object must be specified for each message produced.

    MessageProducer producer = session.createProducer(SomeQueue OR SomeTopic);

    Once a producer has been created, it can be used to send messages as follows: producer.send(message);
  •   MessageConsumer: JMS messages can be consumed in two ways:
    • Synchronously: A client(receiver or subscriber) explicitly fetches a message from the destination using the receive method.
      • Use a Session object and a Destination object to create any needed MessageConsumer objects that are used for receiving messages.

        MessageConsumer consumer = session.createConsumer(SomeQueue or SomeTopic);
        connection.start();
        Message msg = consumer.receive(Long timeOut);

      • Once the consumer has been created, it can be used to receive messages. Message delivery, however, doesn't begin until you start the connection created earlier, which can be done by calling the start() method:
    • Asynchronously: A client can register a message listener(like event listener) with a consumer. 
      • Instantiate a MessageListener object and register it with a MessageConsumer object.
      • A MessageListener object acts as an asynchronous event handler for messages. The MessageListener interface contains one method, onMessage(), which you implement to receive and process the messages.

        MessageListener listener = new MyListener();
        consumer.setMessageListener(listener);

      • In order to avoid missing messages, the start() method should be called on the connection after the listener has been registered. When message delivery begins, the JMS provider automatically invokes the message listener's onMessage() whenever a message is delivered.

Implementation

  • Download and install Java System Message Queue. Once installed, the \bin directory contains a utility to install and uninstall the broker as a Window Service (imqsvcadmin). In addition, it contains the executable for the broker (imqbrokerd)
  • To test installation:
    • Run the broker. Go to the bin directory of your installation and run the following command: > imqbrokerd -tty
      The -tty option causes all logged messages to be displayed to the console in addition to the log file.
    • The broker should start and display a few messages before displaying: imqbroker@hostname:7676 ready
  • Test the broker by running the following command in a separate window:
    > imqcmd query bkr -u admin -p admin
    It should display all the details including host, messages count and cluster information.
Sample code(point to point):

import javax.jms.*;

public class TestJms {
   public static void main(String argv[]) throws Exception {
      // The producer and consumer need to get a connection factory and use it to set up a connection and a session
      QueueConnectionFactory connFactory = new com.sun.messaging.QueueConnectionFactory();
      QueueConnection conn = connFactory.createQueueConnection();
      // The session is not transacted, and it uses automatic message acknowledgement
      QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue q = new com.sun.messaging.Queue("Hello world !!!");
     
      QueueSender sender = session.createSender(q); // Sender
      TextMessage msg = session.createTextMessage(); // Text message
      msg.setText("Hello nik !");
      System.out.println("Sending the message: "+msg.getText());
      sender.send(msg);
      
      QueueReceiver receiver = session.createReceiver(q); // Receiver
      conn.start();
      Message m = receiver.receive();
      if(m instanceof TextMessage) {
         TextMessage txt = (TextMessage) m;
         System.out.println("Message Received: "+txt.getText());
      }
      session.close();
      conn.close();
   }
}

  • Compile TestJms.java:

    > javac -classpath /lib/jms.jar{;|:}/lib/img.jar TestJms.java

    Note: The choice of PATH SEPARATER CHARACTER, {;|:}, is platform dependent.
    • ':' on UNIX/Linux, and
    • ';' on Windows
  • Run: Assuming that the imqbrokerd is still running, run TestJms:

    > java -cp /lib/jms.jar{;|:};/lib/img.jar TestJms TestJms

  • Sample Output:
    > Sending the message: Hello nik !
       Message Received: Hello nik !

Reliable Messaging

JMS defines two delivery modes:
  • Persistent messages: Guaranteed to be successfully consumed once and only once. Messages are not lost.
  • Non-persistent messages: Guaranteed to be delivered at most once. Message loss is not a concern.
This, however, is all about performance trade-offs. The more reliable the delivery of messages, the more bandwidth and overhead required to achieve that reliability. Performance can be maximized by producing non-persistent messages, or you can maximize the reliability by producing persistent messages.

Message-Driven Beans

JMS is a mandatory API and service in J2EE platform. A good example is the message-driven bean, one of a family of EJBs specified in EJB 2.0/2.1. The other two EJBs are session beans and entity beans, which can only be called synchronously.

A JMS Message-Driven Bean (MDB) is a JMS message consumer that implements the JMS MessageListener interface. The onMessage() method is invoked when a message is received by the MDB container. Note that you do not invoke remote methods on MDBs (like with other enterprise beans) and as a result there are no home or remote interfaces associated with them. It also worth noting that with J2EE 1.4, MDBs are not limited to JMS; rather, a multiplicity of MDB interfaces can be declared and consumed by application components implementing those interfaces.

Limitations of JMS:

  • JMS is Java-based. In multi-tiered applications using microservices, where multiple languages and frameworks are used, this can become a hindrance.
  • In JMS, although APIs are specified, the message format is not. This is a limitation of JMS. They just have to use the same API.

An alternate solution to these problems is using Kafka.

JMS and Kafka are both wildly popular solutions for messaging. Whilst JMS has been around for longer it is still a very popular choice for certain use cases. Before you consider which one is better, it is best to do your homework and study the business requirements and your capabilities.

A friendly comparison between JMS and Kafka as follow:

ParameterJMSKafka
Order of MessagesThere is no guarantee that the messages will be received in order.The receiving of messages follows the order in which they are sent to the partition.
FilterThis is a JMS API message selector that allows the consumers to specify which messages they are interested in. This way, message filtering happens in JMS. Message selection can follow specific criteria. The filtering occurs at the producer.There is no concept of the filter at the broker level. Hence, messages picked up by the consumer do not specify any criteria. The filtering can happen only at the consumer level.
Persistence of MessagesIt provides either in- memory or disk-based storage of messages.It stores the messages for a specified period whether or not it has been picked up by the consumer.
Push vs. Pull of MessagesThe providers push the JMS message to queues and topics.The consumers pull the message from the broker.
Load BalancingLoad balancing can be designed by implementing some clustering mechanism. Thus, once the producer sends the messages, the load will be distributed across the clusters.Here load balancing happens automatically. Because once the Kafka nodes publish its metadata that indicates which servers are up and running in the cluster. Also, it tells the producer where the leader is. Thus, the client can send messages to the appropriate partition.

For more information, please visit JMS vs Kafka.

No comments:

Post a Comment

Event Handling in Spring

Spring's event handling is single-threaded so if an event is published,  until and unless all the receivers get the message, the process...