Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / All-Topics

ActiveMQ JMS Code to Get Queue Messages

5.00/5 (1 vote)
14 Dec 2016CPOL 5.2K  
CodeProject Recently I need to create a java application to get messages from the queues on an ActiveMQ JMS server. Here is the code I developed:


Recently I need to create a java application to get messages from the queues on an ActiveMQ JMS server. Here is the code I developed:

/*
 *
 */

import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

public class getQueue{
	// Global parameters.
	static String host;
	static String queueName;
	static String logFile;
	static boolean append;

	public static void main(String[] args) throws Exception {
		if(args.length != 4){
			System.out.println("Usage: java getQueue <ActiveMQ.host.name> <queue> <logFile> <append(y/n)>");
			System.exit(-1);
		}
		else{
			host = args[0];
			queueName = args[1];
			logFile = args[2];
			if(args[3].toLowerCase().equals("y")){
				append = true;
			}
			else{
				append = false;
			}
			// Check if the file exists first.
			if(!append){
				File test = new File(logFile);
				if(test.exists()){
					System.err.println("The log file: " + test.toString() + " exists!\r\nExiting!"
					System.exit(-1);
				}
			}
		}

		thread(new getMessage(), false);
	}

    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class getMessage implements Runnable, ExceptionListener {
        public void run() {
            try {

                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + host + ":61616");

                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();

                connection.setExceptionListener(this);

                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Queue queue = session.createQueue(queueName);

                // Create a log file to log the messages.
                PrintWriter writer = new PrintWriter(new FileWriter(logFile, append));

                // Create a MessageConsumer to get the messages.
                MessageConsumer con = session.createConsumer(queue);
                while(true){
                	Message tempMsg = null;
                	if((tempMsg = con.receive(1000)) != null){
	                	// Get correlation ID and then message text, and write to log file.
	            		String msgCorrId = tempMsg.getJMSCorrelationID();
	                    writer.write("CorrelationID: " + msgCorrId + "|");
	                    if(tempMsg instanceof javax.jms.TextMessage){
	                    	writer.write( ((javax.jms.TextMessage)tempMsg).getText() + "\r\n" );
	                    	writer.flush();
	                    	System.out.print('.');
	                    }
                	}
                }
                //writer.close();

            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}


License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)