package com.aws.sample.amazonmq; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest; import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult; class WrapInt { public int v = 0; } public class AMQPClient { private static final DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.S"); public static void main(String[] args) throws Exception { CommandLine cmd = parseAndValidateCommandLineArguments(args); final WrapInt count = new WrapInt(); final long ds = System.currentTimeMillis(); final int interval = Integer.parseInt(cmd.getOptionValue("interval", "1000")); String name = cmd.getOptionValue("name", UUID.randomUUID().toString()); int deliveryMode = cmd.hasOption("notPersistent") ? DeliveryMode.NON_PERSISTENT : DeliveryMode.PERSISTENT; registerShutdownHook(count, ds, interval); try { String user = null; String password = null; String secrets = null; if (cmd.hasOption("user") && cmd.hasOption("password")) { user = cmd.getOptionValue("user"); password = cmd.getOptionValue("password"); } else { secrets = getUserPassword("MQBrokerUserPassword"); if (secrets!=null && !secrets.isEmpty()) { user = secrets.split(",")[0]; password = secrets.split(",")[1]; } } JmsConnectionFactory connFact = new JmsConnectionFactory(user, password, cmd.getOptionValue("url")); JmsConnection conn = (JmsConnection) connFact.createConnection(); conn.setClientID("AmazonMQWorkshop-" + System.currentTimeMillis()); conn.start(); Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); if (cmd.getOptionValue("mode").contentEquals("sender")) { if (cmd.getOptionValue("type").contentEquals("queue")) { MessageProducer queueMessageProducer = session.createProducer(session.createQueue(cmd.getOptionValue("type") + "://" + cmd.getOptionValue("destination"))); sendMessages(session, queueMessageProducer, name, interval, deliveryMode, count); } else { MessageProducer topicMessageProducer = session.createProducer(session.createTopic(cmd.getOptionValue("type") + "://" + cmd.getOptionValue("destination"))); sendMessages(session, topicMessageProducer, name, interval, deliveryMode, count); } } else { if (cmd.getOptionValue("type").contentEquals("queue")) { MessageConsumer queueConsumer = session.createConsumer(session.createQueue(cmd.getOptionValue("destination"))); receiveMessages(session, queueConsumer); } else { MessageConsumer topicConsumer = session.createConsumer(session.createTopic(cmd.getOptionValue("destination"))); receiveMessages(session, topicConsumer); } } } catch (javax.jms.JMSSecurityException ex) { System.out.println(String.format("Error: %s", ex.getMessage())); System.exit(1); } } private static void sendMessages(Session session, MessageProducer messageProducer, String name, int interval, int deliveryMode, WrapInt count) throws JMSException { String destination = messageProducer.getDestination().toString(); while (true) { count.v++; String id = UUID.randomUUID().toString(); TextMessage message = session.createTextMessage(String.format("[%s] [%s] Message number %s", destination, name, count.v)); message.setJMSMessageID(id); message.setJMSCorrelationID(id); messageProducer.send(message, deliveryMode, 0, -1); if (interval > 0) { System.out.println(String.format("%s - Sender: sent '%s'", df.format(new Date()), message.getText())); try { Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static void receiveMessages(Session session, MessageConsumer consumer) throws JMSException { consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println(String.format("%s - Receiver: received '%s'", df.format(new Date()), msg.getText())); } else if (message instanceof BytesMessage) { BytesMessage msg = (BytesMessage) message; byte[] content = new byte[(int)msg.getBodyLength()]; msg.readBytes(content); System.out.println(String.format("%s - Receiver: received '%s'", df.format(new Date()), new String(content))); } else { System.out.println(String.format("%s - Receiver: received '%s'", df.format(new Date()), message)); } message.acknowledge(); } catch (JMSException e) { throw new RuntimeException(e); } }}); } private static CommandLine parseAndValidateCommandLineArguments(String[] args) throws ParseException { Options options = new Options(); options.addOption("help", false, "Print the help message."); options.addOption("url", true, "The broker connection url."); options.addOption("user", true, "The user to connect to the broker."); options.addOption("password", true, "The password for the user."); options.addOption("mode", true, "Whether to act as 'sender' or 'receiver'"); options.addOption("type", true, "Whether to use a queue or a topic."); options.addOption("destination", true, "The name of the queue or topic"); options.addOption("name", true, "The name of the sender"); options.addOption("interval", true, "The interval in msec at which messages are generated. Default 1000"); options.addOption("notPersistent", false, "Send messages in non-persistent mode"); CommandLineParser parser = new DefaultParser(); CommandLine cmd = parser.parse(options, args); if (cmd.hasOption("help")) { printUsage(options); } if (!(cmd.hasOption("url") && cmd.hasOption("type") && cmd.hasOption("mode") && cmd.hasOption("destination"))) { printUsage(options); } return cmd; } private static void printUsage(Options options) throws ParseException { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp( "java -jar amqp-client.jar -url <url> -user <user> -password <password> -mode <sender|receiver> -type <queue|topic> -destination <destination> [-name <name> -interval <interval> -notPersistent]", options); System.exit(1); } private static void registerShutdownHook(final WrapInt count, final long ds, final int interval) { Thread shutdown = new Thread(new Runnable(){ long d = ds; double rate_theor = interval > 0 ? 1000.0 / interval : 0; public void run() { long delta = System.currentTimeMillis() - d; System.err.print(String.format("\nMessages: %d Seconds: %f Rate: %f/sec vs %f/sec", count.v, delta/1000.0, 1000.0*count.v/delta, rate_theor)); } }); Runtime.getRuntime().addShutdownHook(shutdown); } public static String getUserPassword(String key) { GetParameterResult parameterResult = AWSSimpleSystemsManagementClientBuilder.defaultClient().getParameter(new GetParameterRequest() .withName(key)); return parameterResult.getParameter().getValue(); } }