JBoss JMS using HornetQ

This article shows how JBoss server can leverage the messaging capabilities of HornetQ.

“HornetQ is an open source project to build a multi-protocol, embeddable, very high performance, clustered, asynchronous messaging system.”

1. Open the command prompt and set the JBOSS_HOME environment to the JBoss installation directory

set JBOSS_HOME=C:jboss-5.1.0.GA

set HORNETQ_HOME environment variable to the HornetQ installation directory.

set HORNETQ_HOME=C:hornetq-2.1.1.Final

2.Run build.batunder HORNETQ_HOME/config/jboss-as-5directory

C:hornetq-2.1.1.Finalconfigjboss-as-5>build.bat

This will create 2 new profiles under $JBOSS_HOME/server directory:

–> default-with-hornetq — it corresponds to AS 5 default profile with HornetQ as its JMS provider. In this profile, HornetQ is non-clustered

–> all-with-hornetq — it corresponds to AS 5 all profile with HornetQ as its JMS provider. In this profile, HornetQ is clustered

You can then start JBoss AS 5 using one of these profiles, e.g. :

$JBOSS_HOME/bin/run.bat -c default-with-hornetq

3:- Go to $JBOSS_HOME/server/default-with-hornetq/deploy/hornetq.sar

You will find a file “hornetq-jms.xml” which contains the default ConnectionFactories and the Default Queue Destinations that can be used.

Refer the figure below.

4.  Set the CLASSPATH to the $JBOSS_HOMEclientjbossall-client.jar and all the jar files under $HORNETQ_HOMElib folder

set classpath=%classpath%;$JBOSS_HOMEjbossall-client.jar;$HORNETQ_HOMElibhornetq-jms.jar ; $HORNETQ_HOMElibhornetq-core.jar ; $HORNETQ_HOMElibnetty.jar;

5.  Compile and execute the below QueueSend.java program

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.Hashtable;

import javax.jms.*;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

import java.util.Properties;

public class QueueSend

{

public final static String JNDI_FACTORY="org.jboss.naming.HttpNamingContextFactory";

public final static String JMS_FACTORY="ConnectionFactory";

public final static String QUEUE="/queue/DLQ";

private QueueConnectionFactory qconFactory;

private QueueConnection qcon;

private QueueSession qsession;

private QueueSender qsender;

private Queue queue;

private TextMessage msg;

public void init(Context ctx, String queueName)

throws NamingException, JMSException

{

qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

qcon = qconFactory.createQueueConnection();

qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

queue = (Queue) ctx.lookup(queueName);

qsender = qsession.createSender(queue);

msg = qsession.createTextMessage();

qcon.start();

}

public void send(String message) throws JMSException {

msg.setText(message);

qsender.send(msg);

}

public void close() throws JMSException {

qsender.close();

qsession.close();

qcon.close();

}

public static void main(String[] args) throws Exception {

if (args.length != 1) {

System.out.println("Usage: QueueSend JBossURL");

return;

}

InitialContext ic = getInitialContext(args[0]);

QueueSend qs = new QueueSend();

qs.init(ic, QUEUE);

readAndSend(qs);

qs.close();

}

private static void readAndSend(QueueSend qs)

throws IOException, JMSException

{

BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));

String line=null;

boolean quitNow = false;

do {

System.out.print("Enter message ("quit" to quit): n");

line = msgStream.readLine();

if (line != null && line.trim().length() != 0) {

qs.send(line);

System.out.println("JMS Message Sent: "+line+"n");

quitNow = line.equalsIgnoreCase("quit");

}

} while (! quitNow);

}

private static InitialContext getInitialContext(String url)

throws NamingException

{

Properties env = new Properties();

env.setProperty(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);

env.setProperty(Context.PROVIDER_URL,url);

return new InitialContext(env);

}

}

6: Run the QueueSend.java as below

java QueueSend http://127.0.0.1:8080/invoker/JNDIFactory

6.  Compile and execute the below QueueReceive.java program

import java.util.Hashtable;

import javax.jms.*;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

import java.util.Properties;

public class QueueReceive implements MessageListener

{

public final static String JNDI_FACTORY="org.jboss.naming.HttpNamingContextFactory";

public final static String JMS_FACTORY="ConnectionFactory";

public final static String QUEUE="/queue/DLQ";

private QueueConnectionFactory qconFactory;

private QueueConnection qcon;

private QueueSession qsession;

private QueueReceiver qreceiver;

private Queue queue;

private boolean quit = false;

public void onMessage(Message msg)

{

try {

String msgText;

if (msg instanceof TextMessage) {

msgText = ((TextMessage)msg).getText();

} else {

msgText = msg.toString();

}

System.out.println("Message Received: "+ msgText );

if (msgText.equalsIgnoreCase("quit")) {

synchronized(this) {

quit = true;

this.notifyAll(); // Notify main thread to quit

}

}

} catch (JMSException jmse) {

System.err.println("An exception occurred: "+jmse.getMessage());

}

}

public void init(Context ctx, String queueName)

throws NamingException, JMSException

{

qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

qcon = qconFactory.createQueueConnection();

qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

queue = (Queue) ctx.lookup(queueName);

qreceiver = qsession.createReceiver(queue);

qreceiver.setMessageListener(this);

qcon.start();

}

public void close()throws JMSException

{

qreceiver.close();

qsession.close();

qcon.close();

}

public static void main(String[] args) throws Exception {

if (args.length != 1) {

System.out.println("Usage: QueueReceive JBossURL");

return;

}

InitialContext ic = getInitialContext(args[0]);

QueueReceive qr = new QueueReceive();

qr.init(ic, QUEUE);

System.out.println("JMS Ready To Receive Messages (To quit, send a "quit" message).");

// Wait until a "quit" message has been received.

synchronized(qr) {

while (! qr.quit) {

try {

qr.wait();

} catch (InterruptedException ie) {}

}

}

qr.close();

}

private static InitialContext getInitialContext(String url)

throws NamingException

{

Properties env = new Properties();

env.setProperty(Context.INITIAL_CONTEXT_FACTORY,JNDI_FACTORY);

env.setProperty(Context.PROVIDER_URL,url);

return new InitialContext(env);

}

}

7. Run the QueueReceive.java as below

java QueueReceive http://127.0.0.1:8080/invoker/JNDIFactory

You can download HornetQ from the below link.

http://www.jboss.org/hornetq/downloads.html

Refer the below link for HornetQ documentation.

http://www.jboss.org/hornetq/docs

Cheers,

Wonders Team 🙂

4 comments

    1. Thanks Kamran. 🙂

      Glad to know that the problem is resolved.

      Regards,
      Anandraj

Comments are closed.