Brucalipto.org

Keep your door open!

Implementazione Di Un QueueRequestor Con Timeout.

| Comments

Mi sono trovato nella necessità di dover usare delle code JMS per lo scambio di messaggi tra due applicazioni. L’idea era che a domanda doveva seguire una risposta in maniera sincrona e le API javax.jms mettono a disposizione per lo scopo l’oggetto javax.jms.QueueRequestor.

Il problema

Il problema del QueueRequestor è che non ha un timeout dopo il quale ritorna in qualsiasi caso: se non si ottiene una risposta il QueueRequestor non fa procedere l’esecuzione e ciò non è bello.

La soluzione

Per la soluzione sono stato illuminato da questo post sui newsgroup: qui si afferma che il QueueRequestor è nient’altro che un wrapper di una serie di API pubbliche del package javax.jms. Ecco una classe d’esempio che si interfaccia ad una coda JMS Tibco:

QueueRequestorImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import javax.jms.*;
import java.io.Serializable;
import com.tibco.tibjms.TibjmsQueueConnectionFactory;

public class QueueRequestorImpl
{
  private final static String QUEUE_PATH = "tcp://localhost:7222";
  private final static String QUEUE_NAME = "testQueue";
  private final static long   QUEUE_TIMEOUT = 10000;

  public static Object connect(Serializable request)
  {
      QueueConnection queueConnection = null;
      QueueSession queueSession = null;
      Queue queue = null;
      ObjectMessage objmessage = null;

      TemporaryQueue tempQueue = null;
      QueueSender qSender = null;
      QueueReceiver qReceiver = null;

      try
      {
          QueueConnectionFactory queueConnectionFactory = null;
          queueConnectionFactory = new TibjmsQueueConnectionFactory(QUEUE_PATH);
          queueConnection = queueConnectionFactory.createQueueConnection();
          queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
          queue = getQueue(QUEUE_NAME, queueSession);

          tempQueue = queueSession.createTemporaryQueue();
          qReceiver = queueSession.createReceiver(tempQueue);
          qSender = queueSession.createSender(queue);
      }
      catch (Exception e)
      {
          return null;
      }
      finally
      {
          try
          {
              if (qReceiver!=null) qReceiver.close();
              if (qSender!=null) qSender.close();
              if (tempQueue!=null) tempQueue.delete();
              if (queueConnection!=null) queueConnection.close();
          }catch (Exception e){;}
      }
      try
      {
          objmessage = queueSession.createObjectMessage();
          objmessage.setJMSReplyTo(tempQueue);
          queueConnection.start();
          objmessage.setObject(request);

          long elapsedTime = System.currentTimeMillis();
          qSender.send(objmessage);
          Message response = qReceiver.receive(QUEUE_TIMEOUT);
          elapsedTime = System.currentTimeMillis() - elapsedTime;
          System.out.println("QueueRequestorImpl Response Time: " + elapsedTime);

          if (response!=null)
              System.out.println("Message RECEIVED.");
          else
          {
              System.out.println("Message TIMEOUT.");
              return null;
          }

          String sendIDString = objmessage.getJMSMessageID();
          String respIDString = response.getJMSCorrelationID();
          if (!sendIDString.equals(respIDString))
          {
              String msg = "'"+sendIDString+"'!='"+respIDString+"'";
              System.out.println("JMSCorrelationID MISMATCH ("+msg+").");
              return null;
          }

          return response;
      }
      catch (Exception e)
      {
          System.out.println("JMSException '"+e.getMessage()+"'");
          return null;
      }
      finally
      {
          try
          {
              if (qReceiver!=null) qReceiver.close();
              if (qSender!=null) qSender.close();
              if (tempQueue!=null) tempQueue.delete();
              if (queueConnection!=null) queueConnection.close();
          }
          catch (Exception e){;}
      }
  }

  public static Queue getQueue(String name, QueueSession session)
  {
      try
      {
          return session.createQueue(name);
      }
      catch(JMSException e)
      {
          throw new RuntimeException("Session closed");
      }
  }
}

Conclusione

Come si può vedere il problema è facilmente aggirabile, basta solo fare un po’ di googling e cercare nei javadoc.

Comments