JBoss JMS using HornetQ

This article shows how JBoss server can leverage the messaging capabilities of HornetQ.

“HornetQ is an open source project to build a multi-protocol, embeddable, very high performance, clustered, asynchronous messaging system.”

1. Open the command prompt and set the JBOSS_HOME environment to the JBoss installation directory

set JBOSS_HOME=C:\jboss-5.1.0.GA

set HORNETQ_HOME environment variable to the HornetQ installation directory.

set HORNETQ_HOME=C:\hornetq-2.1.1.Final

2.Run build.batunder HORNETQ_HOME/config/jboss-as-5directory

C:\hornetq-2.1.1.Final\config\jboss-as-5>build.bat

This will create 2 new profiles under $JBOSS_HOME/server directory:

–> default-with-hornetq — it corresponds to AS 5 default profile with HornetQ as its JMS provider. In this profile, HornetQ is non-clustered

–> all-with-hornetq — it corresponds to AS 5 all profile with HornetQ as its JMS provider. In this profile, HornetQ is clustered

You can then start JBoss AS 5 using one of these profiles, e.g. :

$JBOSS_HOME/bin/run.bat -c default-with-hornetq

3:- Go to $JBOSS_HOME/server/default-with-hornetq/deploy/hornetq.sar

You will find a file “hornetq-jms.xml” which contains the default ConnectionFactories and the Default Queue Destinations that can be used.

Refer the figure below.

4.  Set the CLASSPATH to the $JBOSS_HOME\client\jbossall-client.jar and all the jar files under $HORNETQ_HOME\lib folder

set classpath=%classpath%;$JBOSS_HOME\jbossall-client.jar;$HORNETQ_HOME\lib\hornetq-jms.jar ; $HORNETQ_HOME\lib\hornetq-core.jar ; $HORNETQ_HOME\lib\netty.jar;

5.  Compile and execute the below QueueSend.java program

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Hashtable;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueSend
{
public final static String JNDI_FACTORY="org.jboss.naming.HttpNamingContextFactory";
public final static String JMS_FACTORY="ConnectionFactory";
public final static String QUEUE="/queue/DLQ";
private QueueConnectionFactory qconFactory;
private QueueConnection qcon;
private QueueSession qsession;
private QueueSender qsender;
private Queue queue;
private TextMessage msg;
public void init(Context ctx, String queueName)
throws NamingException, JMSException
{
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) ctx.lookup(queueName);
qsender = qsession.createSender(queue);
msg = qsession.createTextMessage();
qcon.start();
}
public void send(String message) throws JMSException {
msg.setText(message);
qsender.send(msg);
}
public void close() throws JMSException {
qsender.close();
qsession.close();
qcon.close();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: QueueSend JBossURL");
return;
}
InitialContext ic = getInitialContext(args[0]);
QueueSend qs = new QueueSend();
qs.init(ic, QUEUE);
readAndSend(qs);
qs.close();
}
private static void readAndSend(QueueSend qs)
throws IOException, JMSException
{
BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));
String line=null;
boolean quitNow = false;
do {
System.out.print("Enter message (\"quit\" to quit): \n");
line = msgStream.readLine();
if (line != null && line.trim().length() != 0) {
qs.send(line);
System.out.println("JMS Message Sent: "+line+"\n");
quitNow = line.equalsIgnoreCase("quit");
}
} while (! quitNow);
}
private static InitialContext getInitialContext(String url)
throws NamingException
{
Properties env = new Properties();
env.setProperty(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.setProperty(Context.PROVIDER_URL,url);
return new InitialContext(env);
}
}

6: Run the QueueSend.java as below

java QueueSend http://127.0.0.1:8080/invoker/JNDIFactory

6.  Compile and execute the below QueueReceive.java program

import java.util.Hashtable;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueReceive implements MessageListener
{
public final static String JNDI_FACTORY="org.jboss.naming.HttpNamingContextFactory";
public final static String JMS_FACTORY="ConnectionFactory";
public final static String QUEUE="/queue/DLQ";
private QueueConnectionFactory qconFactory;
private QueueConnection qcon;
private QueueSession qsession;
private QueueReceiver qreceiver;
private Queue queue;
private boolean quit = false;
public void onMessage(Message msg)
{
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}
System.out.println("Message Received: "+ msgText );
if (msgText.equalsIgnoreCase("quit")) {
synchronized(this) {
quit = true;
this.notifyAll(); // Notify main thread to quit
}
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: "+jmse.getMessage());
}
}
public void init(Context ctx, String queueName)
throws NamingException, JMSException
{
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) ctx.lookup(queueName);
qreceiver = qsession.createReceiver(queue);
qreceiver.setMessageListener(this);
qcon.start();
}
public void close()throws JMSException
{
qreceiver.close();
qsession.close();
qcon.close();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: QueueReceive JBossURL");
return;
}
InitialContext ic = getInitialContext(args[0]);
QueueReceive qr = new QueueReceive();
qr.init(ic, QUEUE);
System.out.println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
// Wait until a "quit" message has been received.
synchronized(qr) {
while (! qr.quit) {
try {
qr.wait();
} catch (InterruptedException ie) {}
}
}
qr.close();
}
private static InitialContext getInitialContext(String url)
throws NamingException
{
Properties env = new Properties();
env.setProperty(Context.INITIAL_CONTEXT_FACTORY,JNDI_FACTORY);
env.setProperty(Context.PROVIDER_URL,url);
return new InitialContext(env);
}
}

7. Run the QueueReceive.java as below

java QueueReceive http://127.0.0.1:8080/invoker/JNDIFactory

You can download HornetQ from the below link.

http://www.jboss.org/hornetq/downloads.html

Refer the below link for HornetQ documentation.

http://www.jboss.org/hornetq/docs

Cheers,

Wonders Team :-)

Latest Comments

  1. Bartosz Jagodzinski December 9, 2010
    • Administrator December 10, 2010
  2. Kamran June 26, 2011
    • anandraj June 27, 2011

Leave a Reply