A communicator maintains a connection with a peer. After its creation, it will be managed by the PeerManager.
Definition at line 57 of file Communicator.java.
Public Member Functions | |
| Communicator (Socket socket, DiameterPeer dp, int direction) | |
| Constructor giving the opened socket. | |
| Communicator (Socket socket, Peer p, int direction) | |
| Constructor giving the opened socket. | |
| boolean | sendMessage (DiameterMessage msg) |
| Send a Diameter message. | |
| synchronized boolean | sendDirect (DiameterMessage msg) |
| Send a Diameter message. | |
| void | run () |
| void | shutdown () |
| Shutdown the socket. | |
Public Attributes | |
| DiameterPeer | diameterPeer |
| DiameterPeer API reference. | |
| Peer | peer = null |
| peer it is comunicating for | |
| boolean | running |
| indicator if still active | |
| int | direction |
| Direction of socket opening. | |
| Socket | socket |
| socket connected to | |
Static Public Attributes | |
| static final int | Initiator = 0 |
| static final int | Receiver = 1 |
Private Member Functions | |
| void | processMessage (DiameterMessage msg) |
Static Private Attributes | |
| static final Logger | LOGGER = Logger.getLogger(Communicator.class) |
| The logger. | |
| static int | MAX_MESSAGE_LENGTH = 1048576 |
| de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Communicator | ( | Socket | socket, | |
| DiameterPeer | dp, | |||
| int | direction | |||
| ) |
Constructor giving the opened socket.
| socket | Socket should be opened. | |
| dp | DiameterPeer, which contains several Peers | |
| direction | 1 for initiator, 0 for receiver |
Definition at line 89 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.running.
00089 { 00090 this.socket = socket; 00091 this.direction = direction; 00092 running = true; 00093 this.diameterPeer = dp; 00094 this.start(); 00095 }
| de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Communicator | ( | Socket | socket, | |
| Peer | p, | |||
| int | direction | |||
| ) |
Constructor giving the opened socket.
| socket | Socket should be opened. | |
| p | Peer, for which the socket is opened. | |
| direction | 1 for initiator, 0 for receiver. |
Definition at line 104 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.peer.Peer.diameterPeer, and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.running.
00104 { 00105 this.socket = socket; 00106 this.direction = direction; 00107 running = true; 00108 this.diameterPeer = p.diameterPeer; 00109 this.peer = p; 00110 this.start(); 00111 }
| boolean de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendMessage | ( | DiameterMessage | msg | ) |
Send a Diameter message.
| msg | The Diameter message which is sent. |
Definition at line 121 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.peer, and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendDirect().
Referenced by de.fhg.fokus.diameter.DiameterPeer.peer.Peer.sendMessage().
00122 { 00123 if (this.peer!=null){ 00124 // to optimize the call and avoid critical zone 00125 //StateMachine.process(peer,StateMachine.Send_Message,msg,this); 00126 StateMachine.Snd_Message(peer,msg); 00127 } 00128 00129 return sendDirect(msg); 00130 }
| synchronized boolean de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendDirect | ( | DiameterMessage | msg | ) |
Send a Diameter message.
| msg | Diameter request which is sent. |
Definition at line 140 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.LOGGER, de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.networkTime, and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.socket.
Referenced by de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.I_Snd_CER(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendMessage(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_CEA(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_DPA(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_DPR(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_DWA(), and de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_DWR().
00141 { 00142 if (!socket.isConnected()) { 00143 System.err.println("Communicator: Tried to send message to unconnected socket."); 00144 return false; 00145 } 00146 byte[] buffer; 00147 int sent=0; 00148 //LOGGER.debug("Communicator:sendDirect():"+msg.toString()); 00149 try { 00150 OutputStream out=socket.getOutputStream(); 00151 buffer = Codec.encodeDiameterMessage(msg); 00152 out.write(buffer,sent,buffer.length-sent); 00153 msg.networkTime = System.currentTimeMillis(); 00154 } catch (Exception e){ 00155 LOGGER.debug("Communicator: Error on message send\n"); 00156 e.printStackTrace(); 00157 return false; 00158 } 00159 return true; 00160 }
| void de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run | ( | ) |
Definition at line 165 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.peer.Peer.I_comm, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.LOGGER, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.MAX_MESSAGE_LENGTH, de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.networkTime, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.peer, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.running, and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.socket.
00165 { 00166 InputStream in; 00167 byte[] buffer = new byte[MAX_MESSAGE_LENGTH]; 00168 DiameterMessage msg; 00169 00170 int cnt,len,x; 00171 00172 try { 00173 socket.setTcpNoDelay(true); 00174 in = socket.getInputStream(); 00175 } catch (IOException e) { 00176 System.err.println("Communicator: Error getting InputStream from socket"); 00177 e.printStackTrace(); 00178 return; 00179 } 00180 00181 try { 00182 while(running){ 00183 /* first we read the version */ 00184 cnt=0; 00185 while(cnt<1){ 00186 x=in.read(buffer,cnt,1); 00187 if (x<0) throw(new Exception("Read failed")); 00188 cnt+=x; 00189 } 00190 if (buffer[0]!=1){ 00191 System.err.println("Communicator: Expecting diameter version 1. Received version "+buffer[0]); 00192 continue; 00193 } 00194 /* then we read the length of the message */ 00195 while(cnt<4){ 00196 x = in.read(buffer,cnt,4-cnt); 00197 if (x<0) throw(new Exception("Read failed")); 00198 cnt+=x; 00199 } 00200 len = ((int)buffer[1]&0xFF)<<16 | 00201 ((int)buffer[2]&0xFF)<< 8 | 00202 ((int)buffer[3]&0xFF); 00203 if (len>MAX_MESSAGE_LENGTH){ 00204 System.err.println("Communicator: Message too long ("+len+">"+MAX_MESSAGE_LENGTH+"."); 00205 } 00206 /* and then we read all the rest of the message */ 00207 while(cnt<len){ 00208 x = in.read(buffer,cnt,len-cnt); 00209 if (x<0) throw(new Exception("Read failed")); 00210 cnt+=x; 00211 } 00212 //LOGGER.debug("received "+cnt+" bytes"); 00213 /* now we can decode the message */ 00214 try { 00215 msg = Codec.decodeDiameterMessage(buffer,0); 00216 } catch (DiameterMessageDecodeException e3) { 00217 System.err.println("Communicator: Error decoding diameter message"); 00218 e3.printStackTrace(); 00219 continue; 00220 } 00221 //LOGGER.debug("Communicator:run(): "+msg.toString()); 00222 msg.networkTime = System.currentTimeMillis(); 00223 if (this.peer!=null) 00224 this.peer.refreshTimer(); 00225 processMessage(msg); 00226 msg = null; 00227 00228 } 00229 } catch (Exception e1) { 00230 if (running){ 00231 if (this.peer!=null){ 00232 if (this.peer.I_comm==this) StateMachine.process(this.peer,StateMachine.I_Peer_Disc); 00233 if (this.peer.R_comm==this) StateMachine.process(this.peer,StateMachine.R_Peer_Disc); 00234 } 00235 LOGGER.debug("Communicator: Error reading from InputStream. Closing socket."); 00236 e1.printStackTrace(); 00237 }/* else it was a shutdown request, it's normal */ 00238 } 00239 00240 running=false; 00241 00242 try { 00243 socket.close(); 00244 } catch (IOException e2) { 00245 System.err.println("Communicator: Error closing socket."); 00246 e2.printStackTrace(); 00247 } 00248 }
| void de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage | ( | DiameterMessage | msg | ) | [private] |
Definition at line 250 of file Communicator.java.
References de.fhg.fokus.diameter.DiameterPeer.peer.PeerManager.addDynamicPeer(), de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.commandCode, de.fhg.fokus.diameter.DiameterPeer.data.AVP.data, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.diameterPeer, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.direction, de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.findAVP(), de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.flagRequest, de.fhg.fokus.diameter.DiameterPeer.peer.PeerManager.getPeerByFQDN(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Initiator, de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.peer, de.fhg.fokus.diameter.DiameterPeer.DiameterPeer.peerManager, de.fhg.fokus.diameter.DiameterPeer.peer.Peer.state, and de.fhg.fokus.diameter.DiameterPeer.data.DiameterMessage.toString().
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run().
00251 { 00252 int event; 00253 00254 // LOGGER.debug("Communicator: Received message \n"+ 00255 // msg.toString()+"\n---"); 00256 00257 /* pre-processing for special states */ 00258 if (this.peer!=null){ 00259 switch (this.peer.state){ 00260 case StateMachine.Wait_I_CEA: 00261 if (msg.commandCode!=DiameterMessage.Code_CE){ 00262 StateMachine.process(this.peer,StateMachine.I_Rcv_Non_CEA,msg,this); 00263 return; 00264 } 00265 break; 00266 case StateMachine.R_Open: 00267 switch (msg.commandCode){ 00268 case DiameterMessage.Code_CE: 00269 if (msg.flagRequest) 00270 StateMachine.process(this.peer,StateMachine.R_Rcv_CER,msg,this); 00271 else 00272 StateMachine.process(this.peer,StateMachine.R_Rcv_CEA,msg,this); 00273 return; 00274 case DiameterMessage.Code_DW: 00275 if (msg.flagRequest) 00276 StateMachine.process(this.peer,StateMachine.R_Rcv_DWR,msg,this); 00277 else 00278 StateMachine.process(this.peer,StateMachine.R_Rcv_DWA,msg,this); 00279 return; 00280 case DiameterMessage.Code_DP: 00281 if (msg.flagRequest) 00282 StateMachine.process(this.peer,StateMachine.R_Rcv_DPR,msg,this); 00283 else 00284 StateMachine.process(this.peer,StateMachine.R_Rcv_DPA,msg,this); 00285 return; 00286 default: 00287 /* faster processing -> no state machine for regular messages */ 00288 //StateMachine.process(this.peer,StateMachine.R_Rcv_Message,msg,this); 00289 StateMachine.Rcv_Process(this.peer,msg); 00290 return; 00291 } 00292 case StateMachine.I_Open: 00293 switch (msg.commandCode){ 00294 case DiameterMessage.Code_CE: 00295 if (msg.flagRequest) 00296 StateMachine.process(this.peer,StateMachine.I_Rcv_CER,msg,this); 00297 else 00298 StateMachine.process(this.peer,StateMachine.I_Rcv_CEA,msg,this); 00299 return; 00300 case DiameterMessage.Code_DW: 00301 if (msg.flagRequest) 00302 StateMachine.process(this.peer,StateMachine.I_Rcv_DWR,msg,this); 00303 else 00304 StateMachine.process(this.peer,StateMachine.I_Rcv_DWA,msg,this); 00305 return; 00306 case DiameterMessage.Code_DP: 00307 if (msg.flagRequest) 00308 StateMachine.process(this.peer,StateMachine.I_Rcv_DPR,msg,this); 00309 else 00310 StateMachine.process(this.peer,StateMachine.I_Rcv_DPA,msg,this); 00311 return; 00312 default: 00313 /* faster processing -> no state machine for regular messages */ 00314 //StateMachine.process(this.peer,StateMachine.I_Rcv_Message,msg,this); 00315 StateMachine.Rcv_Process(this.peer,msg); 00316 return; 00317 } 00318 } 00319 } 00320 00321 /* main processing */ 00322 switch (msg.commandCode){ 00323 case DiameterMessage.Code_CE: 00324 if (msg.flagRequest) { 00325 /* CER - Special processing to find the peer */ 00326 /* find peer */ 00327 AVP fqdn,realm; 00328 Peer p; 00329 fqdn = msg.findAVP(AVP.Origin_Host,true,0); 00330 if (fqdn==null) { 00331 System.err.println("Communicator: CER Received without Origin-Host"); 00332 return; 00333 } 00334 realm = msg.findAVP(AVP.Origin_Realm,true,0); 00335 if (realm==null) { 00336 System.err.println("Communicator: CER Received without Origin-Realm"); 00337 return; 00338 } 00339 p = diameterPeer.peerManager.getPeerByFQDN(new String(fqdn.data)); 00340 if (p==null) { 00341 p = diameterPeer.peerManager.addDynamicPeer(new String(fqdn.data), 00342 new String(realm.data)); 00343 } 00344 if (p==null){ 00345 //Give up 00346 System.err.println("Communicator: Not Allowed to create new Peer"); 00347 return; 00348 } 00349 this.peer = p; 00350 /* call state machine */ 00351 StateMachine.process(p,StateMachine.R_Conn_CER,msg,this); 00352 00353 }else{ 00354 /* CEA */ 00355 if (this.peer==null) { 00356 System.err.println("Receiver: received CEA for an unknown peer"); 00357 System.err.println(msg.toString()); 00358 }else{ 00359 if (this.direction == Initiator) event = StateMachine.I_Rcv_CEA; 00360 else event = StateMachine.R_Rcv_CEA; 00361 StateMachine.process(this.peer,event,msg,this); 00362 } 00363 } 00364 break; 00365 00366 case DiameterMessage.Code_DW: 00367 if (msg.flagRequest){ 00368 if (this.direction == Initiator) event = StateMachine.I_Rcv_DWR; 00369 else event = StateMachine.R_Rcv_DWR; 00370 StateMachine.process(peer,event,msg,this); 00371 } 00372 else { 00373 if (this.direction == Initiator) event = StateMachine.I_Rcv_DWA; 00374 else event = StateMachine.R_Rcv_DWA; 00375 StateMachine.process(peer,event,msg,this); 00376 } 00377 break; 00378 00379 case DiameterMessage.Code_DP: 00380 if (msg.flagRequest){ 00381 if (this.direction == Initiator) event = StateMachine.I_Rcv_DPR; 00382 else event = StateMachine.R_Rcv_DPR; 00383 StateMachine.process(peer,event,msg,this); 00384 } 00385 else { 00386 if (this.direction == Initiator) event = StateMachine.I_Rcv_DPA; 00387 else event = StateMachine.R_Rcv_DPA; 00388 StateMachine.process(peer,event,msg,this); 00389 } 00390 break; 00391 00392 default: 00393 /* 00394 if (this.direction == Initiator) event = StateMachine.I_Rcv_Message; 00395 else event = StateMachine.R_Rcv_Message; 00396 StateMachine.process(peer,event,msg,this); 00397 */ 00398 /* faster processing -> no state machine for regular messages */ 00399 StateMachine.Rcv_Process(this.peer,msg); 00400 } 00401 }
| void de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.shutdown | ( | ) |
Shutdown the socket.
Definition at line 406 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Cleanup(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.I_Disc(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.I_Snd_Conn_Req(), de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.R_Disc(), and de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.R_Reject().
00407 { 00408 this.running = false; 00409 try { 00410 this.socket.close(); 00411 } catch (IOException e) { 00412 System.err.println("Communicator: Shutdown - error closing socket"); 00413 e.printStackTrace(); 00414 } 00415 00416 }
final Logger de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.LOGGER = Logger.getLogger(Communicator.class) [static, private] |
The logger.
Definition at line 60 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run(), and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendDirect().
DiameterPeer API reference.
Definition at line 63 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage().
peer it is comunicating for
Definition at line 66 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run(), and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendMessage().
indicator if still active
Definition at line 69 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Communicator(), and de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run().
Direction of socket opening.
Definition at line 72 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage().
final int de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Initiator = 0 [static] |
Definition at line 73 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.processMessage().
final int de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.Receiver = 1 [static] |
Definition at line 74 of file Communicator.java.
socket connected to
Definition at line 77 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.I_Snd_CER(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run(), de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.sendDirect(), and de.fhg.fokus.diameter.DiameterPeer.peer.StateMachine.Snd_CEA().
int de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.MAX_MESSAGE_LENGTH = 1048576 [static, private] |
Definition at line 79 of file Communicator.java.
Referenced by de.fhg.fokus.diameter.DiameterPeer.transport.Communicator.run().
1.5.2