// xmlBlaster/demo/javaclients/PtpSend.java
import org.jutils.log.LogChannel;
import org.xmlBlaster.util.*;
import org.xmlBlaster.client.*;
import org.xmlBlaster.client.protocol.XmlBlasterConnection;
import org.xmlBlaster.engine.helper.MessageUnit;
import org.xmlBlaster.engine.helper.Destination;
import java.io.*;
//import java.lang.*;


/**
 * A sender client connect to xmlBlaster,
 * the sender sends PtP (point to point) messages to the client "receiver"
 * <p />
 * Use this client as a partner for PtpReceive.java to play with xmlBlaster
 * <p />
 * Invoke:
 * <pre>
 * Start this sender:
 *
 *  java PtpSend
 *     (get exception if message is not delivered)
 *
 *  java PtpSend -numSend 1000 -delay 2000
 *     (send 1000 messages, sleep 2 sec in between)
 *
 *  java PtpSend -forceQueuing true
 *     (message is queued if user 'receiver' is offline)
 *
 * Start a receiver:
 *
 *  java PtpReceive
 *
 * </pre>
 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 */
//public class PtpSend
public class PtpSendInput
{
   private final String ME = "PtpSend";
   private XmlBlasterConnection sender = null;
   private final String senderName = "sender";
   private final String receiverName = "receiver";

   public PtpSendInput(final Global glob) {

//   public PtpSend(final Global glob) {
      
      final LogChannel log = glob.getLog(null);

      try {

         // setup the sender client ...
         sender = new XmlBlasterConnection(glob);

         ConnectQos qos = new ConnectQos(glob, senderName, "secret");
         ConnectReturnQos conRetQos = sender.connect(qos, new I_Callback() {
            public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
               log.info(senderName, "Receiving asynchronous message '" + updateKey.getOid() + "' in default handler");
               return "";
            }
         });  // Login to xmlBlaster, default handler for updates

         log.info(senderName, "Sender connected to xmlBlaster.");

         int numSend = glob.getProperty().get("numSend", 100);
         long delay = glob.getProperty().get("delay", 2000);
         log.info(ME, "Send " + numSend + " messages to '" + receiverName + "' sleeping " + delay + " millis inbetween");
         for (int ii=0; ii<numSend; ii++) {
            try {
               PublishKeyWrapper pk = new PublishKeyWrapper("PtpSend", "text/plain", "1.0");

               PublishQosWrapper pq = new PublishQosWrapper();
					// set to be durable
					pq.setDurable(true);
					
               Destination dest = new Destination(receiverName);
               dest.forceQueuing(glob.getProperty().get("forceQueuing", false));
               pq.addDestination(dest);

// Read in file
               String file = glob.getProperty().get("file", "");
               MessageUnit msgUnit = new MessageUnit(pk.toXml(), "Hi".getBytes(), pq.toXml());
					if (file.length() > 0) 
					{
					
					File inFile = new File(file);
               byte [] b=new byte[(int) inFile.length()];
               System.out.print("file name is " + file + "; length is " + inFile.length());
               System.out.flush();


               try {
                     FileInputStream inFileStream = new FileInputStream(inFile);
							inFileStream.read(b);
                   } 
					catch (IOException e) 
					    {
                     System.err.println(e);
			          }
				    
//               msgUnit = new MessageUnit(pk.toXml(), ("hello:" + b.length).getBytes(), pq.toXml());

               msgUnit = new MessageUnit(pk.toXml(), b, pq.toXml());
					}
					
               PublishRetQos retQos = sender.publish(msgUnit);
               log.info(senderName, "Published message '" + retQos.getOid() + "' to " + receiverName);
            }
            catch (XmlBlasterException e) {
               log.warn(ME, "We have a problem: " + e.toString());
            }
            finally {
               try { Thread.currentThread().sleep(delay); } catch( InterruptedException i) {}
            }
         }
      }
      catch (XmlBlasterException e) {
         log.error(ME, "Houston, we have a problem: " + e.toString());
      }
      finally {
         if (sender != null) { sender.disconnect(new DisconnectQos()); }
      }
   }

   /**
    * Try
    * <pre>
    *   java PtpSend -help
    * </pre>
    * for usage help
    */
   public static void main(String args[]) {
      Global glob = new Global();
      
      if (glob.init(args) != 0) { // Get help with -help
         XmlBlasterConnection.usage();
         glob.getLog(null).info("PtpSend", "Example: java PtpSend -forceQueuing true\n");
         System.exit(1);
      }

      // new PtpSend(glob);
      new PtpSendInput(glob);
   }
}
