The primary publish and subscribe
message broker clients are the PublisherBean and SubscriberBean
classes .
- The NetReader can be dynamically
located on a network by a NetWriter based on the reader's uniquely assigned
service name. Alternately, a writer can explicitly identify the remote
reader's host name and port address.
- Information (messages) can be
passed between a writer and reader in several formats. These formats
include byte arrays (for binary data), Strings (for simple text information),
data records (for record-oriented data), or topical messages (that can contain
binary, String or data record information).
See the java source code located in
/osmq/sample for samples of reader and writer applications. Below is an
example of a simple reader / writer pair that pass information in string format:
PublisherBean
Summary
A PublisherBean opens a connection to a named message broker and begins passing
point-to-point and/or topical messages to the broker. Messages are queued
by the broker for the named recipients and registered topical subscribers.
Messages will be retrieved in the order in which they were sent by the PublisherBean.
Sample Code
package osmq.samplecode.basic;
import java.awt.*;
import java.awt.event.*;
import java.io.*;
import java.util.*;
import osmq.clients.Publisher;
import osmq.clients.PublisherBean;
import osmq.util.*;
import osmq.util.ExceptionListener;
import osmq.messages.*;
import java.util.Calendar;
import java.util.Date;
/**
* Sample code that uses a PublisherBean to attach to a remote
* message broker and publish a set of topical messages.
*/
public class PubSample implements ExceptionListener
{
private static long MAX_PUB = 1000000;
private long written = 0;
private Publisher publisher;
private String topicname = "CUSTOMER";
private boolean isOpen = false;
DataSetMessage message;
public PubSample()
{}
public void open() throws IOException
{
// Create a notification message with 4 elements
and a topic of "CLIENT"
message = MessageFactory.createNotificationMessage(4,
"CLIENT");
// Create a publisher, and set my unique client
ID.
// The 'originator' attribute on messages that I
publish
// will default to this client ID
publisher = new PublisherBean();
publisher.setBrokerName("BROKER");
publisher.setClientID("SUBSAMPLE");
// Next I register to be notified regarding any
broker-related exceptions.
// My public function onException(Exception e) will
be called in that event.
publisher.setExceptionListener(this);
// Open the connection to the message broker.
This performs dynamic
// discovery and then creates a TCP socket connection
to the broker.
publisher.open();
}
public void publishAll() throws IOException, MessageException
{
while(written++ < MAX_PUB)
publisher.publish(getNextMessageValues());
}
public void onException(Exception e)
{
System.err.println("Failure event notification:
"
+ e.getMessage());
this.close();
System.exit(1);
}
// This method represents the method that builds the message content.
private DataSetMessage getNextMessageValues()
{
// I indicate the topical event is either an "ADD"
"UPDATE"
// or "DELETE" transaction so that downstream
datamarts can be
// maintained accordingly.
// This is an optional header field
message.setTransactionAsChar(MessageAttributes.TRANS_ADD);
// I identify the value for the primary (unique)
key
// that is used for table-level database synchronization
of
// downstream datamart maintenance subscribers.
// This is an optional header field
message.setKey("PRIMARY_KEY");
// I clear all former message body values
message.clearBody();
// I set each element in the transaction body.
message.setString(1, "JONE");
message.setString(2, "FRANCIS");
message.setDouble(3, 500.00 + getRandom(100000));
message.setInt(4, 600);
// Finally, I return the message object
return message;
}
private double getRandom(int multiplier)
{
return java.lang.Math.random() * multiplier;
}
// Closes the publisher bean and disconnects from the broker
private void close()
{
try{publisher.close();}
catch(Exception e){}
finally{publisher = null;}
}
public static void main(String[] args)
{
PubSample sp = new PubSample();
try
{
sp.open();
sp.publishAll();
}
catch(Exception e){}
finally{sp.close();}
}
}
SubscriberBean
Sample Code
package osmq.samplecode.basic;
import java.awt.*;
import java.awt.event.*;
import java.io.*;
import osmq.clients.Subscriber;
import osmq.clients.SubscriberBean;
import osmq.util.*;
import osmq.messages.*;
import osmq.util.ExceptionListener;
/**
* Sample code that uses a SubscriberBean to attach to a remote
* message broker and retrieve a set of topical messages.
*/
public class SubSample implements ExceptionListener, MessageListener
{
private int ctr = 0;
private Subscriber bean;
private boolean isOpen = false;
public SubSample()
{
bean = new SubscriberBean();
}
public void open() throws Exception
{
bean.setBrokerName("BROKER");
// First I identify myself to the message broker
by ID. This
// is a mandatory value, to be set before opening
the session.
bean.setClientID("SUBSAMPLE");
// Subscribe to one or more topics.
bean.addTopic("CLIENT");
// A MessageListener is a class with a public
onMessage(DataSetMessage) function.
// Subscribers must identify an instance of MessageListener
that will
// receive topical messages. Since SampleSubscriber
is a MessageListener,
// this will be registered as the MessageListener
bean.setMessageListener(this);
// Next I register to be notified regarding any
broker-related exceptions.
// My public function onException(Exception e) will
be called in that event.
bean.setExceptionListener(this);
// Connect to the broker, which locates the message
server, connects to it,
// and begins a flow of messages from the broker
// to my onMessage(DataSetMessage) function.
bean.open();
}
/**
* Public function called by the subcription handler when
a DataSetMessage
* arrives, based either on my subcription to topic(s)
(PUB-SUB), or simply
* addressed to my client ID (POINT-TO-POINT)
*/
public void onMessage(Message ms)
{
if(ms.getFormat() != MessageFormat.DATASET)
throw new IllegalArgumentException("Message
type is not dataset");
// cast the message as a dataset message so I
can access the various elements
DataSetMessage m = (DataSetMessage) ms;
// Display key fields from every 5000th message
if((++ctr % 5000) != 0 )
return;
// reference the SSN and last name elements as
strings
System.out.println("Last name is " + m.getString(1));
System.out.println("First name is "
+ m.getString(2));
// reference the salary element as a double
System.out.println("Salary is " + m.getDouble(3));
// reference the units element as an integer
System.out.println("Shares is " + m.getInt(4));
}
/**
* Called if there is a serious broker exception.
*/
public void onException(Exception e)
{
byte buffer[] = new byte[10];
System.out.println("Failure event notification:
" + e.getMessage());
try{System.in.read(buffer);}
catch(IOException z){}
close();
System.exit(1);
}
private void close()
{
try{bean.close();}
catch(Exception e){}
}
public static void main(String[] args)
{
SubSample sample = null;
try {
sample = new SubSample();
sample.open();
// wait for user to press
the enter key
System.in.read(new byte[100]);
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
if(sample != null)
sample.close();
System.exit(0);
}
}
}