Peer-to-peer and
publish / subscribe client classes

Peer-to-Peer Clients

The primary peer-to-peer message-reading and message-writing classes are the NetReader and NetWriter.  

  • The NetReader can be dynamically located on a network by a NetWriter based on the reader's uniquely assigned service name. Alternately, a writer can explicitly identify the remote reader's host name and port address.  
  • Information (messages) can be passed between a writer and reader in several formats.  These formats include byte arrays (for binary data), Strings (for simple text information), data records (for record-oriented data), or topical messages (that can contain binary, String or data record information).  
See the java source code located in /osmq/samplecode for samples of reader and writer applications.  Below is an example of a simple reader / writer pair that pass information in string format:

NetReader

Summary

A subscriber instantiates a NetReader object and sets the NetReader's service name and listening port.  The service name is used by a remote publisher to dynamically locate and connect to the NetReader. One or more connections can be accepted and handled simultaneously by the NetReader.  Once the NetReader is opened, the application calls its getNextString() function to retrieve messages.  Messages are retrieved in the order in which they were sent by the remote publisher(s).  

Sample Code

import osmq.net.NetReader;

public static void main(String[] args)
    {
    NetReader reader.
    try{
       reader = new NetReader ();

       // Identify the unique service name
       reader.setServiceName("SAFESTORE");

       // Define the listening port
       reader.setPort(3843);

       // If queue reaches 1000 records,
       // overflow remainder to disk
       reader.setOverflowSize(1000);

       // Open the connection for message reception
       reader.open();

       // Start fetching messages as strings and printing them to the console
       for(;;)
          {
          System.out.println("Next value = " +
                             reader.getNextString());
          }
       }

    catch(Exception e)
        {
        System.out.println("Reader Exception");
        try{System.in.read(new byte[10]);}
        catch(Exception ignore){}
        }
    finally
        {
        reader.close();
        }
    }

NetWriter

Sample Code

import osmq.net.NetWriter;

public static void main(String[] args)
      {
      NetWriter writer;

      try{
         // instantiate the publisher
         writer = new NetWriter ();
         
         // identify the remote service
         writer.setRemoteServiceName("SAFESTORE");

         // connect to the service
         writer.open();

         // write messages to the service
         int i;
         for(i = 0; i < 1000000; ++i)
            writer.write("HELLO");

         // close the connection
         writer.close();

         }
      
catch(Exception e)
        {
        System.out.println("Writer Exception");
        try{System.in.read(new byte[10]);}
        catch(Exception ignore){}
        }

    }

Publish / Subscribe Clients

The primary publish and subscribe message broker clients are the PublisherBean and SubscriberBean classes .  

  • The NetReader can be dynamically located on a network by a NetWriter based on the reader's uniquely assigned service name. Alternately, a writer can explicitly identify the remote reader's host name and port address.  
  • Information (messages) can be passed between a writer and reader in several formats.  These formats include byte arrays (for binary data), Strings (for simple text information), data records (for record-oriented data), or topical messages (that can contain binary, String or data record information).  
See the java source code located in /osmq/sample for samples of reader and writer applications.  Below is an example of a simple reader / writer pair that pass information in string format:

PublisherBean

Summary

A PublisherBean opens a connection to a named message broker and begins passing point-to-point and/or topical messages to the broker. Messages are queued by the broker for the named recipients and registered topical subscribers.  Messages will be retrieved in the order in which they were sent by the PublisherBean. 

Sample Code

package osmq.samplecode.basic;


import java.awt.*;
import java.awt.event.*;
import java.io.*;
import java.util.*;

import osmq.clients.Publisher;
import osmq.clients.PublisherBean;
import osmq.util.*;
import osmq.util.ExceptionListener;
import osmq.messages.*;

import java.util.Calendar;
import java.util.Date;

/**
* Sample code that uses a PublisherBean to attach to a remote
* message broker and publish a set of topical messages.
*/
public class PubSample implements ExceptionListener
{
private static long MAX_PUB = 1000000;
private long written = 0;
private Publisher publisher;
private String topicname = "CUSTOMER";
private boolean isOpen = false;
DataSetMessage message;


public PubSample()
    {}


public void open() throws IOException
    {
    // Create a notification message with 4 elements and a topic of "CLIENT"
    message = MessageFactory.createNotificationMessage(4, "CLIENT");

    // Create a publisher, and set my unique client ID.
    // The 'originator' attribute on messages that I publish
    // will default to this client ID
    publisher = new PublisherBean();
    publisher.setBrokerName("BROKER");
    publisher.setClientID("SUBSAMPLE");

    // Next I register to be notified regarding any broker-related exceptions.
    // My public function onException(Exception e) will be called in that event.
    publisher.setExceptionListener(this);

    // Open the connection to the message broker. This performs dynamic
    // discovery and then creates a TCP socket connection to the broker.
     publisher.open();
    }

public void publishAll() throws IOException, MessageException
    {
    while(written++ < MAX_PUB)
    publisher.publish(getNextMessageValues());
    }

public void onException(Exception e)
    {
    System.err.println("Failure event notification: "
    + e.getMessage());
    this.close();
    System.exit(1);
    }

// This method represents the method that builds the message content.
private DataSetMessage getNextMessageValues()
    {
    // I indicate the topical event is either an "ADD" "UPDATE"
    // or "DELETE" transaction so that downstream datamarts can be
    // maintained accordingly.
    // This is an optional header field
    message.setTransactionAsChar(MessageAttributes.TRANS_ADD);

    // I identify the value for the primary (unique) key
    // that is used for table-level database synchronization of
    // downstream datamart maintenance subscribers.
    // This is an optional header field
    message.setKey("PRIMARY_KEY");

    // I clear all former message body values
    message.clearBody();

    // I set each element in the transaction body.
    message.setString(1, "JONE");
    message.setString(2, "FRANCIS");
    message.setDouble(3, 500.00 + getRandom(100000));
    message.setInt(4, 600);

    // Finally, I return the message object
    return message;
    }

private double getRandom(int multiplier)
    {
    return java.lang.Math.random() * multiplier;
    }

// Closes the publisher bean and disconnects from the broker
private void close()
    {
    try{publisher.close();}
    catch(Exception e){}
    finally{publisher = null;}
    }

public static void main(String[] args)
    {
    PubSample sp = new PubSample();

    try
        {
        sp.open();
        sp.publishAll();
        }
    catch(Exception e){}
    finally{sp.close();}
    }
}

SubscriberBean

Sample Code
package osmq.samplecode.basic;

import java.awt.*;
import java.awt.event.*;
import java.io.*;

import osmq.clients.Subscriber;
import osmq.clients.SubscriberBean;
import osmq.util.*;
import osmq.messages.*;
import osmq.util.ExceptionListener;

/**
* Sample code that uses a SubscriberBean to attach to a remote
* message broker and retrieve a set of topical messages.
*/
public class SubSample implements ExceptionListener, MessageListener
    {
    private int ctr = 0;
    private Subscriber bean;
    private boolean isOpen = false;

public SubSample()
    {
    bean = new SubscriberBean();
    }

public void open() throws Exception
    {
    bean.setBrokerName("BROKER");
    // First I identify myself to the message broker by ID. This
    // is a mandatory value, to be set before opening the session.
    bean.setClientID("SUBSAMPLE");

    // Subscribe to one or more topics.
    bean.addTopic("CLIENT");

    // A MessageListener is a class with a public onMessage(DataSetMessage) function.
    // Subscribers must identify an instance of MessageListener that will
    // receive topical messages. Since SampleSubscriber is a MessageListener,
    // this will be registered as the MessageListener
    bean.setMessageListener(this);

    // Next I register to be notified regarding any broker-related exceptions.
    // My public function onException(Exception e) will be called in that event.
    bean.setExceptionListener(this);

    // Connect to the broker, which locates the message server, connects to it,
    // and begins a flow of messages from the broker
    // to my onMessage(DataSetMessage) function.
    bean.open();
    }

/**
   * Public function called by the subcription handler when a DataSetMessage
   * arrives, based either on my subcription to topic(s) (PUB-SUB), or simply
   * addressed to my client ID (POINT-TO-POINT)
   */
public void onMessage(Message ms)
    {
    if(ms.getFormat() != MessageFormat.DATASET)
        throw new IllegalArgumentException("Message type is not dataset");

    // cast the message as a dataset message so I can access the various elements
    DataSetMessage m = (DataSetMessage) ms;

    // Display key fields from every 5000th message
    if((++ctr % 5000) != 0 )
        return;

    // reference the SSN and last name elements as strings
    System.out.println("Last name is " + m.getString(1));

    System.out.println("First name is " + m.getString(2));

    // reference the salary element as a double
    System.out.println("Salary is " + m.getDouble(3));

    // reference the units element as an integer
    System.out.println("Shares is " + m.getInt(4));
    }

/**
* Called if there is a serious broker exception.
*/
public void onException(Exception e)
    {
    byte buffer[] = new byte[10];
    System.out.println("Failure event notification: " + e.getMessage());
    try{System.in.read(buffer);}
    catch(IOException z){}
    close();
    System.exit(1);
    }

private void close()
    {
    try{bean.close();}
    catch(Exception e){}
    }

public static void main(String[] args)
    {
    SubSample sample = null;
    try {
        sample = new SubSample();
        sample.open();
        // wait for user to press the enter key
        System.in.read(new byte[100]);
        }
    catch (Exception e)
        {
        e.printStackTrace();
        }
    finally{
        if(sample != null)
        sample.close();
        System.exit(0);
        }
    }
}