00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 package de.fhg.fokus.diameter.DiameterPeer.transport;
00032
00033 import java.io.IOException;
00034 import java.io.InputStream;
00035 import java.io.OutputStream;
00036 import java.net.Socket;
00037
00038 import org.apache.log4j.Logger;
00039
00040 import de.fhg.fokus.diameter.DiameterPeer.DiameterPeer;
00041 import de.fhg.fokus.diameter.DiameterPeer.data.AVP;
00042 import de.fhg.fokus.diameter.DiameterPeer.data.Codec;
00043 import de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage;
00044 import de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessageDecodeException;
00045 import de.fhg.fokus.diameter.DiameterPeer.peer.Peer;
00046 import de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine;
00047
00057 public class Communicator extends Thread {
00058
00060 private static final Logger LOGGER = Logger.getLogger(Communicator.class);
00061
00063 public DiameterPeer diameterPeer;
00064
00066 public Peer peer=null;
00067
00069 public boolean running;
00070
00072 public int direction;
00073 public static final int Initiator = 0;
00074 public static final int Receiver = 1;
00075
00077 public Socket socket;
00078
00079 private static int MAX_MESSAGE_LENGTH=1048576;
00080
00081
00089 public Communicator(Socket socket, DiameterPeer dp,int direction) {
00090 this.socket = socket;
00091 this.direction = direction;
00092 running = true;
00093 this.diameterPeer = dp;
00094 this.start();
00095 }
00096
00104 public Communicator(Socket socket, Peer p,int direction) {
00105 this.socket = socket;
00106 this.direction = direction;
00107 running = true;
00108 this.diameterPeer = p.diameterPeer;
00109 this.peer = p;
00110 this.start();
00111 }
00112
00113
00114
00121 public boolean sendMessage(DiameterMessage msg)
00122 {
00123 if (this.peer!=null){
00124
00125
00126 StateMachine.Snd_Message(peer,msg);
00127 }
00128
00129 return sendDirect(msg);
00130 }
00131
00132
00133
00140 public synchronized boolean sendDirect(DiameterMessage msg)
00141 {
00142 if (!socket.isConnected()) {
00143 System.err.println("Communicator: Tried to send message to unconnected socket.");
00144 return false;
00145 }
00146 byte[] buffer;
00147 int sent=0;
00148
00149 try {
00150 OutputStream out=socket.getOutputStream();
00151 buffer = Codec.encodeDiameterMessage(msg);
00152 out.write(buffer,sent,buffer.length-sent);
00153 msg.networkTime = System.currentTimeMillis();
00154 } catch (Exception e){
00155 LOGGER.debug("Communicator: Error on message send\n");
00156 e.printStackTrace();
00157 return false;
00158 }
00159 return true;
00160 }
00161
00162
00163
00164
00165 public void run() {
00166 InputStream in;
00167 byte[] buffer = new byte[MAX_MESSAGE_LENGTH];
00168 DiameterMessage msg;
00169
00170 int cnt,len,x;
00171
00172 try {
00173 socket.setTcpNoDelay(true);
00174 in = socket.getInputStream();
00175 } catch (IOException e) {
00176 System.err.println("Communicator: Error getting InputStream from socket");
00177 e.printStackTrace();
00178 return;
00179 }
00180
00181 try {
00182 while(running){
00183
00184 cnt=0;
00185 while(cnt<1){
00186 x=in.read(buffer,cnt,1);
00187 if (x<0) throw(new Exception("Read failed"));
00188 cnt+=x;
00189 }
00190 if (buffer[0]!=1){
00191 System.err.println("Communicator: Expecting diameter version 1. Received version "+buffer[0]);
00192 continue;
00193 }
00194
00195 while(cnt<4){
00196 x = in.read(buffer,cnt,4-cnt);
00197 if (x<0) throw(new Exception("Read failed"));
00198 cnt+=x;
00199 }
00200 len = ((int)buffer[1]&0xFF)<<16 |
00201 ((int)buffer[2]&0xFF)<< 8 |
00202 ((int)buffer[3]&0xFF);
00203 if (len>MAX_MESSAGE_LENGTH){
00204 System.err.println("Communicator: Message too long ("+len+">"+MAX_MESSAGE_LENGTH+".");
00205 }
00206
00207 while(cnt<len){
00208 x = in.read(buffer,cnt,len-cnt);
00209 if (x<0) throw(new Exception("Read failed"));
00210 cnt+=x;
00211 }
00212
00213
00214 try {
00215 msg = Codec.decodeDiameterMessage(buffer,0);
00216 } catch (DiameterMessageDecodeException e3) {
00217 System.err.println("Communicator: Error decoding diameter message");
00218 e3.printStackTrace();
00219 continue;
00220 }
00221
00222 msg.networkTime = System.currentTimeMillis();
00223 if (this.peer!=null)
00224 this.peer.refreshTimer();
00225 processMessage(msg);
00226 msg = null;
00227
00228 }
00229 } catch (Exception e1) {
00230 if (running){
00231 if (this.peer!=null){
00232 if (this.peer.I_comm==this) StateMachine.process(this.peer,StateMachine.I_Peer_Disc);
00233 if (this.peer.R_comm==this) StateMachine.process(this.peer,StateMachine.R_Peer_Disc);
00234 }
00235 LOGGER.debug("Communicator: Error reading from InputStream. Closing socket.");
00236 e1.printStackTrace();
00237 }
00238 }
00239
00240 running=false;
00241
00242 try {
00243 socket.close();
00244 } catch (IOException e2) {
00245 System.err.println("Communicator: Error closing socket.");
00246 e2.printStackTrace();
00247 }
00248 }
00249
00250 private void processMessage(DiameterMessage msg)
00251 {
00252 int event;
00253
00254
00255
00256
00257
00258 if (this.peer!=null){
00259 switch (this.peer.state){
00260 case StateMachine.Wait_I_CEA:
00261 if (msg.commandCode!=DiameterMessage.Code_CE){
00262 StateMachine.process(this.peer,StateMachine.I_Rcv_Non_CEA,msg,this);
00263 return;
00264 }
00265 break;
00266 case StateMachine.R_Open:
00267 switch (msg.commandCode){
00268 case DiameterMessage.Code_CE:
00269 if (msg.flagRequest)
00270 StateMachine.process(this.peer,StateMachine.R_Rcv_CER,msg,this);
00271 else
00272 StateMachine.process(this.peer,StateMachine.R_Rcv_CEA,msg,this);
00273 return;
00274 case DiameterMessage.Code_DW:
00275 if (msg.flagRequest)
00276 StateMachine.process(this.peer,StateMachine.R_Rcv_DWR,msg,this);
00277 else
00278 StateMachine.process(this.peer,StateMachine.R_Rcv_DWA,msg,this);
00279 return;
00280 case DiameterMessage.Code_DP:
00281 if (msg.flagRequest)
00282 StateMachine.process(this.peer,StateMachine.R_Rcv_DPR,msg,this);
00283 else
00284 StateMachine.process(this.peer,StateMachine.R_Rcv_DPA,msg,this);
00285 return;
00286 default:
00287
00288
00289 StateMachine.Rcv_Process(this.peer,msg);
00290 return;
00291 }
00292 case StateMachine.I_Open:
00293 switch (msg.commandCode){
00294 case DiameterMessage.Code_CE:
00295 if (msg.flagRequest)
00296 StateMachine.process(this.peer,StateMachine.I_Rcv_CER,msg,this);
00297 else
00298 StateMachine.process(this.peer,StateMachine.I_Rcv_CEA,msg,this);
00299 return;
00300 case DiameterMessage.Code_DW:
00301 if (msg.flagRequest)
00302 StateMachine.process(this.peer,StateMachine.I_Rcv_DWR,msg,this);
00303 else
00304 StateMachine.process(this.peer,StateMachine.I_Rcv_DWA,msg,this);
00305 return;
00306 case DiameterMessage.Code_DP:
00307 if (msg.flagRequest)
00308 StateMachine.process(this.peer,StateMachine.I_Rcv_DPR,msg,this);
00309 else
00310 StateMachine.process(this.peer,StateMachine.I_Rcv_DPA,msg,this);
00311 return;
00312 default:
00313
00314
00315 StateMachine.Rcv_Process(this.peer,msg);
00316 return;
00317 }
00318 }
00319 }
00320
00321
00322 switch (msg.commandCode){
00323 case DiameterMessage.Code_CE:
00324 if (msg.flagRequest) {
00325
00326
00327 AVP fqdn,realm;
00328 Peer p;
00329 fqdn = msg.findAVP(AVP.Origin_Host,true,0);
00330 if (fqdn==null) {
00331 System.err.println("Communicator: CER Received without Origin-Host");
00332 return;
00333 }
00334 realm = msg.findAVP(AVP.Origin_Realm,true,0);
00335 if (realm==null) {
00336 System.err.println("Communicator: CER Received without Origin-Realm");
00337 return;
00338 }
00339 p = diameterPeer.peerManager.getPeerByFQDN(new String(fqdn.data));
00340 if (p==null) {
00341 p = diameterPeer.peerManager.addDynamicPeer(new String(fqdn.data),
00342 new String(realm.data));
00343 }
00344 if (p==null){
00345
00346 System.err.println("Communicator: Not Allowed to create new Peer");
00347 return;
00348 }
00349 this.peer = p;
00350
00351 StateMachine.process(p,StateMachine.R_Conn_CER,msg,this);
00352
00353 }else{
00354
00355 if (this.peer==null) {
00356 System.err.println("Receiver: received CEA for an unknown peer");
00357 System.err.println(msg.toString());
00358 }else{
00359 if (this.direction == Initiator) event = StateMachine.I_Rcv_CEA;
00360 else event = StateMachine.R_Rcv_CEA;
00361 StateMachine.process(this.peer,event,msg,this);
00362 }
00363 }
00364 break;
00365
00366 case DiameterMessage.Code_DW:
00367 if (msg.flagRequest){
00368 if (this.direction == Initiator) event = StateMachine.I_Rcv_DWR;
00369 else event = StateMachine.R_Rcv_DWR;
00370 StateMachine.process(peer,event,msg,this);
00371 }
00372 else {
00373 if (this.direction == Initiator) event = StateMachine.I_Rcv_DWA;
00374 else event = StateMachine.R_Rcv_DWA;
00375 StateMachine.process(peer,event,msg,this);
00376 }
00377 break;
00378
00379 case DiameterMessage.Code_DP:
00380 if (msg.flagRequest){
00381 if (this.direction == Initiator) event = StateMachine.I_Rcv_DPR;
00382 else event = StateMachine.R_Rcv_DPR;
00383 StateMachine.process(peer,event,msg,this);
00384 }
00385 else {
00386 if (this.direction == Initiator) event = StateMachine.I_Rcv_DPA;
00387 else event = StateMachine.R_Rcv_DPA;
00388 StateMachine.process(peer,event,msg,this);
00389 }
00390 break;
00391
00392 default:
00393
00394
00395
00396
00397
00398
00399 StateMachine.Rcv_Process(this.peer,msg);
00400 }
00401 }
00402
00406 public void shutdown()
00407 {
00408 this.running = false;
00409 try {
00410 this.socket.close();
00411 } catch (IOException e) {
00412 System.err.println("Communicator: Shutdown - error closing socket");
00413 e.printStackTrace();
00414 }
00415
00416 }
00417 }