Communicator.java

Go to the documentation of this file.
00001 /*
00002  * $Id: Communicator.java 2 2006-11-14 22:37:20Z vingarzan $
00003  *
00004  * Copyright (C) 2004-2006 FhG Fokus
00005  *
00006  * This file is part of Open IMS Core - an open source IMS CSCFs & HSS
00007  * implementation
00008  *
00009  * Open IMS Core is free software; you can redistribute it and/or modify
00010  * it under the terms of the GNU General Public License as published by
00011  * the Free Software Foundation; either version 2 of the License, or
00012  * (at your option) any later version.
00013  *
00014  * For a license to use the Open IMS Core software under conditions
00015  * other than those described here, or to purchase support for this
00016  * software, please contact Fraunhofer FOKUS by e-mail at the following
00017  * addresses:
00018  *     info@open-ims.org
00019  *
00020  * Open IMS Core is distributed in the hope that it will be useful,
00021  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00022  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00023  * GNU General Public License for more details.
00024  *
00025  * You should have received a copy of the GNU General Public License
00026  * along with this program; if not, write to the Free Software
00027  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00028  *
00029  */
00030 
00031 package de.fhg.fokus.diameter.DiameterPeer.transport;
00032 
00033 import java.io.IOException;
00034 import java.io.InputStream;
00035 import java.io.OutputStream;
00036 import java.net.Socket;
00037 
00038 import org.apache.log4j.Logger;
00039 
00040 import de.fhg.fokus.diameter.DiameterPeer.DiameterPeer;
00041 import de.fhg.fokus.diameter.DiameterPeer.data.AVP;
00042 import de.fhg.fokus.diameter.DiameterPeer.data.Codec;
00043 import de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage;
00044 import de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessageDecodeException;
00045 import de.fhg.fokus.diameter.DiameterPeer.peer.Peer;
00046 import de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine;
00047 
00057 public class Communicator extends Thread {
00058     
00060     private static final Logger LOGGER = Logger.getLogger(Communicator.class);
00061 
00063     public DiameterPeer diameterPeer;
00064         
00066     public Peer peer=null;
00067     
00069     public boolean running; 
00070     
00072     public int direction;
00073     public static final int Initiator   = 0;
00074     public static final int Receiver    = 1;
00075     
00077     public Socket socket;
00078 
00079     private static int MAX_MESSAGE_LENGTH=1048576;
00080     
00081     
00089     public Communicator(Socket socket, DiameterPeer dp,int direction) {
00090         this.socket = socket;
00091         this.direction = direction;
00092         running = true;
00093         this.diameterPeer = dp;
00094         this.start();       
00095     }
00096      
00104     public Communicator(Socket socket, Peer p,int direction) {
00105         this.socket = socket;
00106         this.direction = direction;
00107         running = true;
00108         this.diameterPeer = p.diameterPeer;
00109         this.peer = p;
00110         this.start();       
00111     }
00112     
00113     
00114     
00121     public boolean sendMessage(DiameterMessage msg)
00122     {
00123         if (this.peer!=null){
00124             // to optimize the call and avoid critical zone 
00125             //StateMachine.process(peer,StateMachine.Send_Message,msg,this);
00126             StateMachine.Snd_Message(peer,msg);
00127         }
00128             
00129         return sendDirect(msg);
00130     }
00131     
00132     
00133     
00140     public synchronized boolean sendDirect(DiameterMessage msg)
00141     {
00142         if (!socket.isConnected()) {
00143             System.err.println("Communicator: Tried to send message to unconnected socket.");
00144             return false;
00145         }
00146         byte[] buffer;
00147         int sent=0;
00148         //LOGGER.debug("Communicator:sendDirect():"+msg.toString());
00149         try {
00150             OutputStream out=socket.getOutputStream();
00151             buffer = Codec.encodeDiameterMessage(msg);
00152             out.write(buffer,sent,buffer.length-sent);
00153             msg.networkTime = System.currentTimeMillis();
00154         } catch (Exception e){
00155             LOGGER.debug("Communicator: Error on message send\n");
00156             e.printStackTrace();
00157             return false;
00158         }
00159         return true;
00160     }
00161     
00162     /* (non-Javadoc)
00163      * @see java.lang.Runnable#run()
00164      */
00165     public void run() {
00166         InputStream in;
00167         byte[] buffer = new byte[MAX_MESSAGE_LENGTH];
00168         DiameterMessage msg;
00169         
00170         int cnt,len,x;
00171         
00172         try {
00173             socket.setTcpNoDelay(true);
00174             in = socket.getInputStream();
00175         } catch (IOException e) {
00176             System.err.println("Communicator: Error getting InputStream from socket");
00177             e.printStackTrace();
00178             return;
00179         }
00180 
00181         try {
00182                 while(running){
00183                     /* first we read the version */ 
00184                     cnt=0;
00185                     while(cnt<1){
00186                         x=in.read(buffer,cnt,1);
00187                         if (x<0) throw(new Exception("Read failed"));
00188                         cnt+=x;
00189                     }
00190                     if (buffer[0]!=1){
00191                         System.err.println("Communicator: Expecting diameter version 1. Received version "+buffer[0]);
00192                         continue;
00193                     }
00194                     /* then we read the length of the message */
00195                     while(cnt<4){
00196                         x = in.read(buffer,cnt,4-cnt);
00197                         if (x<0) throw(new Exception("Read failed"));
00198                         cnt+=x;
00199                     }
00200                     len = ((int)buffer[1]&0xFF)<<16 |
00201                           ((int)buffer[2]&0xFF)<< 8 |
00202                           ((int)buffer[3]&0xFF);
00203                     if (len>MAX_MESSAGE_LENGTH){
00204                         System.err.println("Communicator: Message too long ("+len+">"+MAX_MESSAGE_LENGTH+".");
00205                     }
00206                     /* and then we read all the rest of the message */
00207                     while(cnt<len){
00208                         x = in.read(buffer,cnt,len-cnt);
00209                         if (x<0) throw(new Exception("Read failed"));
00210                         cnt+=x;
00211                     }
00212                     //LOGGER.debug("received "+cnt+" bytes");
00213                     /* now we can decode the message */
00214                     try {
00215                         msg = Codec.decodeDiameterMessage(buffer,0);
00216                     } catch (DiameterMessageDecodeException e3) {
00217                         System.err.println("Communicator: Error decoding diameter message");
00218                         e3.printStackTrace();
00219                         continue;
00220                     }
00221                     //LOGGER.debug("Communicator:run(): "+msg.toString());
00222                     msg.networkTime = System.currentTimeMillis();
00223                     if (this.peer!=null)
00224                         this.peer.refreshTimer();
00225                     processMessage(msg);
00226                     msg = null;
00227                                                             
00228                 }
00229         } catch (Exception e1) {
00230             if (running){
00231                 if (this.peer!=null){
00232                     if (this.peer.I_comm==this) StateMachine.process(this.peer,StateMachine.I_Peer_Disc);
00233                     if (this.peer.R_comm==this) StateMachine.process(this.peer,StateMachine.R_Peer_Disc);
00234                 }
00235                 LOGGER.debug("Communicator: Error reading from InputStream. Closing socket.");
00236                 e1.printStackTrace();
00237             }/* else it was a shutdown request, it's normal */
00238         }
00239 
00240         running=false;
00241 
00242         try {
00243             socket.close();
00244         } catch (IOException e2) {
00245             System.err.println("Communicator: Error closing socket.");
00246             e2.printStackTrace();
00247         }
00248     }
00249     
00250     private void processMessage(DiameterMessage msg)
00251     {
00252         int event;
00253         
00254 //      LOGGER.debug("Communicator: Received message \n"+
00255 //              msg.toString()+"\n---");
00256         
00257         /* pre-processing for special states */
00258         if (this.peer!=null){
00259             switch (this.peer.state){
00260                 case StateMachine.Wait_I_CEA:
00261                     if (msg.commandCode!=DiameterMessage.Code_CE){
00262                         StateMachine.process(this.peer,StateMachine.I_Rcv_Non_CEA,msg,this);
00263                         return;
00264                     }
00265                     break;      
00266                 case StateMachine.R_Open:
00267                     switch (msg.commandCode){
00268                         case DiameterMessage.Code_CE:
00269                             if (msg.flagRequest)
00270                                 StateMachine.process(this.peer,StateMachine.R_Rcv_CER,msg,this);
00271                             else
00272                                 StateMachine.process(this.peer,StateMachine.R_Rcv_CEA,msg,this);
00273                             return;
00274                         case DiameterMessage.Code_DW:
00275                             if (msg.flagRequest)
00276                                 StateMachine.process(this.peer,StateMachine.R_Rcv_DWR,msg,this);
00277                             else
00278                                 StateMachine.process(this.peer,StateMachine.R_Rcv_DWA,msg,this);
00279                             return;
00280                         case DiameterMessage.Code_DP:
00281                             if (msg.flagRequest)
00282                                 StateMachine.process(this.peer,StateMachine.R_Rcv_DPR,msg,this);
00283                             else
00284                                 StateMachine.process(this.peer,StateMachine.R_Rcv_DPA,msg,this);
00285                             return;
00286                         default:
00287                             /* faster processing -> no state machine for regular messages */
00288                             //StateMachine.process(this.peer,StateMachine.R_Rcv_Message,msg,this);
00289                             StateMachine.Rcv_Process(this.peer,msg);
00290                             return;
00291                     }
00292                 case StateMachine.I_Open:
00293                     switch (msg.commandCode){
00294                     case DiameterMessage.Code_CE:
00295                         if (msg.flagRequest)
00296                             StateMachine.process(this.peer,StateMachine.I_Rcv_CER,msg,this);
00297                         else
00298                             StateMachine.process(this.peer,StateMachine.I_Rcv_CEA,msg,this);
00299                         return;
00300                     case DiameterMessage.Code_DW:
00301                         if (msg.flagRequest)
00302                             StateMachine.process(this.peer,StateMachine.I_Rcv_DWR,msg,this);
00303                         else
00304                             StateMachine.process(this.peer,StateMachine.I_Rcv_DWA,msg,this);
00305                         return;
00306                     case DiameterMessage.Code_DP:
00307                         if (msg.flagRequest)
00308                             StateMachine.process(this.peer,StateMachine.I_Rcv_DPR,msg,this);
00309                         else
00310                             StateMachine.process(this.peer,StateMachine.I_Rcv_DPA,msg,this);
00311                         return;
00312                     default:
00313                         /* faster processing -> no state machine for regular messages */
00314                         //StateMachine.process(this.peer,StateMachine.I_Rcv_Message,msg,this);
00315                         StateMachine.Rcv_Process(this.peer,msg);
00316                         return;
00317                 }
00318             }
00319         }
00320         
00321         /* main processing */
00322         switch (msg.commandCode){
00323             case DiameterMessage.Code_CE:
00324                 if (msg.flagRequest) {
00325                     /* CER  - Special processing to find the peer */
00326                     /* find peer */
00327                     AVP fqdn,realm;
00328                     Peer p;
00329                     fqdn = msg.findAVP(AVP.Origin_Host,true,0);
00330                     if (fqdn==null) {
00331                         System.err.println("Communicator: CER Received without Origin-Host");
00332                         return;
00333                     }
00334                     realm = msg.findAVP(AVP.Origin_Realm,true,0);
00335                     if (realm==null) {
00336                         System.err.println("Communicator: CER Received without Origin-Realm");
00337                         return;
00338                     }
00339                     p = diameterPeer.peerManager.getPeerByFQDN(new String(fqdn.data));
00340                     if (p==null) {
00341                         p = diameterPeer.peerManager.addDynamicPeer(new String(fqdn.data),
00342                                     new String(realm.data));
00343                     }
00344                     if (p==null){
00345                         //Give up
00346                         System.err.println("Communicator: Not Allowed to create new Peer");
00347                         return;
00348                     }
00349                     this.peer = p;
00350                     /* call state machine */    
00351                     StateMachine.process(p,StateMachine.R_Conn_CER,msg,this);
00352     
00353                 }else{
00354                     /* CEA */
00355                     if (this.peer==null) {
00356                         System.err.println("Receiver: received CEA for an unknown peer");
00357                         System.err.println(msg.toString());
00358                     }else{
00359                         if (this.direction == Initiator) event = StateMachine.I_Rcv_CEA;
00360                         else event = StateMachine.R_Rcv_CEA;
00361                         StateMachine.process(this.peer,event,msg,this);
00362                     }
00363                 }
00364                 break;
00365                 
00366             case DiameterMessage.Code_DW:
00367                 if (msg.flagRequest){
00368                     if (this.direction == Initiator) event = StateMachine.I_Rcv_DWR;
00369                     else event = StateMachine.R_Rcv_DWR;
00370                     StateMachine.process(peer,event,msg,this);
00371                 }
00372                 else { 
00373                     if (this.direction == Initiator) event = StateMachine.I_Rcv_DWA;
00374                     else event = StateMachine.R_Rcv_DWA;
00375                     StateMachine.process(peer,event,msg,this);
00376                 }
00377                 break;
00378             
00379             case DiameterMessage.Code_DP:
00380                 if (msg.flagRequest){
00381                     if (this.direction == Initiator) event = StateMachine.I_Rcv_DPR;
00382                     else event = StateMachine.R_Rcv_DPR;
00383                     StateMachine.process(peer,event,msg,this);
00384                 }
00385                 else { 
00386                     if (this.direction == Initiator) event = StateMachine.I_Rcv_DPA;
00387                     else event = StateMachine.R_Rcv_DPA;
00388                     StateMachine.process(peer,event,msg,this);
00389                 }
00390                 break;
00391             
00392             default:
00393                 /*
00394                 if (this.direction == Initiator) event = StateMachine.I_Rcv_Message;
00395                 else event = StateMachine.R_Rcv_Message;
00396                 StateMachine.process(peer,event,msg,this);
00397                 */
00398                 /* faster processing -> no state machine for regular messages */
00399                 StateMachine.Rcv_Process(this.peer,msg);
00400         }
00401     }
00402     
00406     public void shutdown()
00407     {
00408         this.running = false;
00409         try {
00410             this.socket.close();
00411         } catch (IOException e) {
00412             System.err.println("Communicator: Shutdown - error closing socket");
00413             e.printStackTrace();
00414         }
00415 
00416     }
00417 }

Generated on Thu Oct 23 04:07:36 2008 for Open IMS Core JavaDiameterPeer by  doxygen 1.5.2