receiver.c File Reference


Detailed Description

CDiameterPeer Receiver process procedures.

Author:
Dragos Vingarzan vingarzan -at- fokus dot fraunhofer dot de

Definition in file receiver.c.

#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <signal.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "utils.h"
#include "globals.h"
#include "diameter_api.h"
#include "peerstatemachine.h"
#include "peermanager.h"
#include "config.h"
#include "receiver.h"

Go to the source code of this file.

Defines

#define PIPE_PREFIX   "/tmp/cdp_send_"
 prefix for the send FIFO pipes
#define hdr_len   20
 length of the Diameter message header

Functions

int dp_add_pid (pid_t pid)
 Add a pid to the local process list.
void dp_del_pid (pid_t pid)
 Delete a pid from the process list.
void receive_loop (int sock)
 Receive Loop for Diameter messages.
void receive_message (AAAMessage *msg, int sock)
 Receives a mesasge and does basic processing or call the sm_process().
static void set_peer_pipe ()
 Sets the send pipe name for the peer serviced by this process.
void receiver_init (int sock, peer *p)
 Initializes the receiver.
void receiver_process (int sock)
 The Receiver Process - calls the receiv_loop and it never returns.
static int select_recv (int s, void *buf, int len, int opt)
 Select on sockets for receiving.
int peer_connect (peer *p)
 Initiate a connection to a peer.
int peer_send_msg (peer *p, AAAMessage *msg)
 Sends a message to a peer (to be called from other processes).
int peer_send (peer *p, int sock, AAAMessage *msg, int locked)
 Send a message to a peer (only to be called from the receiver process).

Variables

dp_configconfig
 Configuration for this diameter peer.
peerthis_peer = 0
 pointer to the peer serviced by this process
int pipe_fd
 file descriptor for reading from the send pipe
int pipe_fd_out
 file descriptor for writting to the send pipe
str pipe_name
 full path to the pipe


Define Documentation

#define PIPE_PREFIX   "/tmp/cdp_send_"

prefix for the send FIFO pipes

Definition at line 93 of file receiver.c.

Referenced by receiver_init().

#define hdr_len   20

length of the Diameter message header

Definition at line 272 of file receiver.c.

Referenced by receive_loop().


Function Documentation

int dp_add_pid ( pid_t  pid  )  [inline]

Add a pid to the local process list.

Parameters:
pid newly forked pid
Returns:
1 on success or 0 on error

Definition at line 95 of file diameter_peer.c.

00096 {
00097     pid_list_t *n;
00098     lock_get(pid_list_lock);
00099     n = shm_malloc(sizeof(pid_list_t));
00100     if (!n){
00101         LOG_NO_MEM("shm",sizeof(pid_list_t));
00102         lock_release(pid_list_lock);
00103         return 0;
00104     }
00105     n->pid = pid;
00106     n->next = 0;
00107     n->prev = pid_list->tail;
00108     if (!pid_list->head) pid_list->head = n;
00109     if (pid_list->tail) pid_list->tail->next = n;
00110     pid_list->tail = n;
00111     lock_release(pid_list_lock);
00112     return 1;
00113 }

void dp_del_pid ( pid_t  pid  )  [inline]

Delete a pid from the process list.

Parameters:
pid - the pid to remove

Definition at line 132 of file diameter_peer.c.

00133 {   
00134     pid_list_t *i;
00135     lock_get(pid_list_lock);
00136     i = pid_list->head;
00137     if (!i) {
00138         lock_release(pid_list_lock);
00139         return;
00140     }
00141     while(i && i->pid!=pid) i = i->next;
00142     if (i){
00143         if (i->prev) i->prev->next = i->next;
00144         else pid_list->head = i->next;
00145         if (i->next) i->next->prev = i->prev;
00146         else pid_list->tail = i->prev;
00147         shm_free(i);
00148     }
00149     lock_release(pid_list_lock);
00150 }

void receive_loop ( int  sock  ) 

Receive Loop for Diameter messages.

Decodes the message and calls receive_message().

Parameters:
sock - the socket to receive from
Returns:
when the socket is closed

Definition at line 280 of file receiver.c.

References AAATranslateMessage(), DP_MAX_MSG_LENGTH, get_3bytes, hdr_len, I_Peer_Disc, _peer_t::I_sock, LOG_NO_MEM, R_Peer_Disc, receive_message(), select_recv(), shutdownx, sm_process(), and this_peer.

Referenced by receiver_process().

00281 {
00282     char buf[hdr_len],*msg;
00283     int buf_len,length,version,cnt,msg_len;
00284     AAAMessage *dmsg;
00285     
00286     while(!*shutdownx){
00287         buf_len=0;
00288         while(buf_len<1){   
00289             cnt = select_recv(sock,buf+buf_len,1,0);
00290             if (cnt<0) goto error;
00291             buf_len+=cnt;
00292         }
00293         version = (unsigned char)buf[0];
00294         if (version!=1) {
00295             LOG(L_ERR,"ERROR:receive_loop():[%d] Recv Unknown version [%d]\n",sock,(unsigned char)buf[0]);
00296             continue;           
00297         }       
00298         while(buf_len<hdr_len){
00299             cnt = select_recv(sock,buf+buf_len,hdr_len-buf_len,0);
00300             if (cnt<0) goto error;
00301             buf_len+=cnt;
00302         }
00303         length = get_3bytes(buf+1);
00304         if (length>DP_MAX_MSG_LENGTH){
00305             LOG(L_ERR,"ERROR:receive_loop():[%d] Msg too big [%d] bytes\n",sock,length);
00306             goto error;
00307         }
00308         LOG(L_DBG,"DBG:receive_loop():[%d] Recv Version %d Length %d\n",sock,version,length);
00309         msg = shm_malloc(length);
00310         if (!msg) {
00311             LOG_NO_MEM("shm",length);
00312             goto error;
00313         }
00314             
00315         memcpy(msg,buf,hdr_len);
00316         msg_len=hdr_len;
00317         while(msg_len<length){
00318             cnt = select_recv(sock,msg+msg_len,length-msg_len,0);
00319             if (cnt<0) {
00320                 shm_free(msg);
00321                 goto error;
00322             }
00323             msg_len+=cnt;
00324         }
00325         LOG(L_DBG,"DBG:receive_loop():[%d] Recv message complete\n",sock);
00326         
00327         dmsg = AAATranslateMessage((unsigned char*)msg,(unsigned int)msg_len,1);
00328         
00329         /*shm_free(msg);*/
00330 
00331         if (dmsg) receive_message(dmsg,sock);
00332         else{
00333             shm_free(msg);
00334         }
00335         
00336     }
00337 error:
00338     if (this_peer) {
00339         if (this_peer->I_sock == sock) sm_process(this_peer,I_Peer_Disc,0,0,sock);
00340         if (this_peer->R_sock == sock) sm_process(this_peer,R_Peer_Disc,0,0,sock);
00341     }
00342     LOG(L_ERR,"INFO:receive_loop():[%d] Client closed connection or error... BYE\n",sock);
00343 }

void receive_message ( AAAMessage msg,
int  sock 
)

Receives a mesasge and does basic processing or call the sm_process().

This gets called from the receive_loop for every message that is received.

Parameters:
msg - the message received
sock - socket received on

Definition at line 524 of file receiver.c.

References AAAFindMatchingAVP(), AAAFreeMessage(), dp_config::accept_unknown_peers, AVP_Origin_Host, AVP_Origin_Realm, Code_CE, Code_DP, Code_DW, _message_t::commandCode, config, avp::data, get_peer_from_fqdn(), get_peer_from_sock(), I_Open, I_Rcv_CEA, I_Rcv_CER, I_Rcv_DPA, I_Rcv_DPR, I_Rcv_DWA, I_Rcv_DWR, I_Rcv_Message, I_Rcv_Non_CEA, is_req, R_Conn_CER, R_Open, R_Rcv_CEA, R_Rcv_CER, R_Rcv_DPA, R_Rcv_DPR, R_Rcv_DWA, R_Rcv_DWR, R_Rcv_Message, set_peer_pipe(), sm_process(), _peer_t::state, this_peer, touch_peer(), and Wait_I_CEA.

Referenced by receive_loop().

00525 {
00526     AAA_AVP *avp1,*avp2;
00527     LOG(L_DBG,"DBG:receive_message(): [%d] Recv msg %d\n",sock,msg->commandCode);
00528 
00529     if (!this_peer) {
00530         this_peer = get_peer_from_sock(sock);
00531         set_peer_pipe();
00532     }
00533     
00534     if (!this_peer){
00535         switch (msg->commandCode){
00536             case Code_CE:
00537                 if (is_req(msg)){
00538                     avp1 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Host,0,0);
00539                     avp2 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Realm,0,0);
00540                     if (avp1&&avp2){
00541                         this_peer = get_peer_from_fqdn(avp1->data,avp2->data);
00542                     }
00543                     if (!this_peer) {
00544                         LOG(L_ERR,"ERROR:receive_msg(): Received CER from unknown peer (accept unknown=%d) -ignored\n",
00545                             config->accept_unknown_peers);
00546                         AAAFreeMessage(&msg);
00547                     }else{
00548                         set_peer_pipe();                        
00549                         sm_process(this_peer,R_Conn_CER,msg,0,sock);
00550                     }
00551                 }
00552                 else{
00553                     LOG(L_ERR,"ERROR:receive_msg(): Received CEA from an unknown peer -ignored\n");
00554                     AAAFreeMessage(&msg);
00555                 }
00556                 break;
00557             default:
00558                 LOG(L_ERR,"ERROR:receive_msg(): Received non-CE from an unknown peer -ignored\n");
00559                 AAAFreeMessage(&msg);               
00560         }
00561     }else{
00562         touch_peer(this_peer);
00563         switch (this_peer->state){
00564             case Wait_I_CEA:
00565                 if (msg->commandCode!=Code_CE||is_req(msg)){
00566                     sm_process(this_peer,I_Rcv_Non_CEA,msg,0,sock);
00567                 }else
00568                     sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00569                 break;
00570             case I_Open:            
00571                 switch (msg->commandCode){
00572                     case Code_CE:
00573                         if (is_req(msg)) sm_process(this_peer,I_Rcv_CER,msg,0,sock);    
00574                                     else sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00575                         break;
00576                     case Code_DW:
00577                         if (is_req(msg)) sm_process(this_peer,I_Rcv_DWR,msg,0,sock);    
00578                                     else sm_process(this_peer,I_Rcv_DWA,msg,0,sock);
00579                         break;
00580                     case Code_DP:
00581                         if (is_req(msg)) sm_process(this_peer,I_Rcv_DPR,msg,0,sock);    
00582                                     else sm_process(this_peer,I_Rcv_DPA,msg,0,sock);
00583                         break;
00584                     default:
00585                         sm_process(this_peer,I_Rcv_Message,msg,0,sock);
00586                 }               
00587                 break;              
00588             case R_Open:            
00589                 switch (msg->commandCode){
00590                     case Code_CE:
00591                         if (is_req(msg)) sm_process(this_peer,R_Rcv_CER,msg,0,sock);    
00592                                     else sm_process(this_peer,R_Rcv_CEA,msg,0,sock);
00593                         break;
00594                     case Code_DW:
00595                         if (is_req(msg)) sm_process(this_peer,R_Rcv_DWR,msg,0,sock);    
00596                                     else sm_process(this_peer,R_Rcv_DWA,msg,0,sock);
00597                         break;
00598                     case Code_DP:
00599                         if (is_req(msg)) sm_process(this_peer,R_Rcv_DPR,msg,0,sock);    
00600                                     else sm_process(this_peer,R_Rcv_DPA,msg,0,sock);
00601                         break;
00602                     default:
00603                         sm_process(this_peer,R_Rcv_Message,msg,0,sock);
00604                 }               
00605                 break;              
00606             default:
00607                 LOG(L_ERR,"ERROR:receive_msg(): Received msg while peer in state %d -ignored\n",this_peer->state);
00608                 AAAFreeMessage(&msg);                               
00609         }
00610     }
00611 }

static void set_peer_pipe (  )  [inline, static]

Sets the send pipe name for the peer serviced by this process.

Definition at line 101 of file receiver.c.

References pipe_name, _peer_t::send_pipe, and this_peer.

Referenced by receive_message(), and receiver_init().

00102 {
00103     if (!this_peer) return;
00104 //  lock_get(this_peer->lock);
00105     this_peer->send_pipe = pipe_name;
00106 //  lock_release(this_peer->lock);
00107 }

void receiver_init ( int  sock,
peer p 
)

Initializes the receiver.

Parameters:
sock - the socket to initialize with
p - the peer to initialize with

Definition at line 114 of file receiver.c.

References errno, pipe_fd, pipe_fd_out, pipe_name, PIPE_PREFIX, set_peer_pipe(), and this_peer.

Referenced by accept_connection().

00115 {
00116     this_peer = p;
00117     
00118     pipe_name.s = shm_malloc(sizeof(PIPE_PREFIX)+64);
00119     sprintf(pipe_name.s,"%s%d_%d_%d",PIPE_PREFIX,(unsigned int) time(0),sock,getpid());
00120     pipe_name.len = strlen(pipe_name.s);
00121         
00122     set_peer_pipe();    
00123 
00124     mkfifo(pipe_name.s, 0666);  
00125     pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00126     if (pipe_fd<0){
00127         LOG(L_ERR,"ERROR:receiver_process(): FIFO open failed > %s\n",strerror(errno));
00128     }
00129     // we open it for writting just to keep it alive - won't close when all other writers close it
00130     pipe_fd_out = open(pipe_name.s, O_WRONLY);
00131 }

void receiver_process ( int  sock  ) 

The Receiver Process - calls the receiv_loop and it never returns.

Parameters:
sock - socket to receive data from
Returns:
never, when disconnected it will exit

Definition at line 138 of file receiver.c.

References dp_del_pid(), _peer_t::lock, memlog, pipe_fd, pipe_fd_out, pipe_name, receive_loop(), _peer_t::send_pipe, and this_peer.

Referenced by accept_connection().

00139 {
00140     LOG(L_INFO,"INFO:receiver_process(): [%d] Receiver process starting up...\n",sock);
00141 
00142         
00143     receive_loop(sock);
00144     LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process cleaning-up.\n",sock);
00145     close(sock);
00146     close(pipe_fd);
00147     close(pipe_fd_out);
00148     remove(pipe_name.s);
00149     if (this_peer){
00150         lock_get(this_peer->lock);
00151         this_peer->send_pipe.s=0;
00152         this_peer->send_pipe.len=0;
00153         lock_release(this_peer->lock);
00154     }
00155     shm_free(pipe_name.s);
00156 //done:     
00157     /* remove pid from list of running processes */
00158     dp_del_pid(getpid());
00159     
00160 #ifdef CDP_FOR_SER
00161     drop_my_process();      
00162 #else
00163 #ifdef PKG_MALLOC
00164     #ifdef PKG_MALLOC
00165         LOG(memlog, "Receiver[%d] Memory status (pkg):\n",sock);
00166         //pkg_status();
00167         #ifdef pkg_sums
00168             pkg_sums();
00169         #endif 
00170     #endif
00171 #endif
00172 #endif      
00173         
00174     LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process finished.\n",sock);
00175     exit(0);
00176 }

static int select_recv ( int  s,
void *  buf,
int  len,
int  opt 
) [inline, static]

Select on sockets for receiving.

Selects on both the socket and on the send pipe.

Parameters:
s - the receive socket
buf - buffer to read into
len - max length of the read buffer
opt - recv() flags
Returns:
number of bytes read or -1 on error

Definition at line 187 of file receiver.c.

References AAAFreeMessage(), _message_t::buf, errno, pipe_fd, pipe_name, select, and shutdownx.

Referenced by receive_loop().

00188 {
00189     fd_set rfds,efds;
00190     struct timeval tv;
00191     int n,max;
00192     AAAMessage *msg=0;
00193     int r=0,cnt=0;
00194     
00195 //  if (shutdownx) return -1;
00196     max = s;
00197     if (pipe_fd>max) max = pipe_fd;
00198     n = 0;
00199     
00200     while(!n){
00201         if (shutdownx&&*shutdownx) break;   
00202 
00203         FD_ZERO(&rfds);
00204         FD_SET(s,&rfds);
00205         FD_SET(pipe_fd,&rfds);
00206         FD_ZERO(&efds);
00207         FD_SET(s,&efds);
00208         tv.tv_sec=1;
00209         tv.tv_usec=0;
00210 
00211 //      LOG(L_CRIT,"ERROR:select_recv(): HERE\n");
00212         
00213         n = select(max+1,&rfds,0,&efds,&tv);
00214         if (n==-1){
00215             if (shutdownx&&*shutdownx) return -1;
00216             LOG(L_ERR,"ERROR:select_recv(): %s\n",strerror(errno));
00217             return -1;
00218         }else
00219             if (n){
00220                 if (FD_ISSET(s,&efds)) return -1;               
00221                 if (FD_ISSET(pipe_fd,&rfds)) {                  
00222                     LOG(L_DBG,"DBG:select_recv(): There is something on the pipe\n");
00223                     cnt = read(pipe_fd,&msg,sizeof(AAAMessage *));
00224                     LOG(L_DBG,"DBG:select_recv(): Pipe says [%p] %d\n",msg,cnt);
00225                     if (cnt==0){
00226                         //This is very stupid and might not work well - droped messages... to be fixed
00227                         LOG(L_INFO,"INFO:select_recv(): ReOpening pipe for read. This should not happen...\n");
00228                         close(pipe_fd);
00229                         pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00230                         goto receive;
00231                     }
00232                     if (cnt<sizeof(AAAMessage *)){
00233                         if (cnt<0) LOG(L_ERR,"ERROR:select_recv(): Error reading from pipe\n");
00234                         r = -1;
00235                         goto receive;
00236                     }   
00237                     
00238                     while( (cnt=write(s,msg->buf.s,msg->buf.len))==-1 ) {
00239                         if (errno==EINTR)
00240                             continue;
00241                         LOG(L_ERR,"ERROR:select_recv(): write returned error> %s\n",
00242                             strerror(errno));
00243                         close(s);
00244                         AAAFreeMessage(&msg);       
00245                         r = -1;
00246                         return r;
00247                     }
00248                                             
00249                     if (cnt!=msg->buf.len){
00250                         LOG(L_ERR,"ERROR:select_recv(): only wrote %d/%d bytes\n",cnt,msg->buf.len);
00251                         close(s);
00252                         AAAFreeMessage(&msg);       
00253                         r = -1;
00254                         return r;
00255                     }
00256                     AAAFreeMessage(&msg);
00257                     //don't return, maybe there is something to read
00258                 }
00259 receive:
00260                 if (FD_ISSET(s,&rfds)) {
00261                     cnt = recv(s,buf,len,opt);
00262                     if (cnt==0) return -1;
00263                     else return cnt;
00264                 }
00265             }
00266         //LOG(L_ERR,".");
00267     }
00268     return r;
00269 }

int peer_connect ( peer p  ) 

Initiate a connection to a peer.

Parameters:
p - peer to connect to
Returns:
socket if OK, -1 on error

Definition at line 350 of file receiver.c.

References errno, _peer_t::fqdn, and _peer_t::port.

Referenced by I_Snd_Conn_Req().

00351 {
00352     int sock;
00353     int pid;
00354     unsigned int option = 1;
00355     
00356     struct addrinfo *ainfo=0,*res=0,hints;      
00357     char buf[256],host[256],serv[256];
00358     int error;
00359 
00360     memset (&hints, 0, sizeof(hints));
00361     //hints.ai_protocol = IPPROTO_SCTP;
00362     //hints.ai_protocol = IPPROTO_TCP;
00363     hints.ai_flags = AI_ADDRCONFIG;
00364     hints.ai_socktype = SOCK_STREAM;
00365     
00366     sprintf(buf,"%d",p->port);
00367 
00368     error = getaddrinfo(p->fqdn.s, buf, &hints, &res);
00369 
00370     if (error!=0){
00371         LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to %.*s:%d >%s\n",
00372             p->fqdn.len,p->fqdn.s,p->port,gai_strerror(error));
00373         goto error;
00374     }
00375         
00376     for(ainfo = res;ainfo;ainfo = ainfo->ai_next)
00377     {
00378         if (getnameinfo(ainfo->ai_addr,ainfo->ai_addrlen,
00379             host,256,serv,256,NI_NUMERICHOST|NI_NUMERICSERV)==0){
00380                 LOG(L_WARN,"INFO:peer_connect(): Trying to connect to %s port %s\n",
00381                     host,serv);
00382         }               
00383 
00384         if ((sock = socket(ainfo->ai_family, ainfo->ai_socktype, ainfo->ai_protocol)) == -1) {
00385             LOG(L_ERR,"ERROR:create_socket(): error creating client socket to %s port %s >"
00386                 " %s\n",host,serv,strerror(errno));
00387             continue;
00388         }
00389 
00390         if (connect(sock,ainfo->ai_addr,ainfo->ai_addrlen)!=0) {
00391             LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to to %s port %s >%s\n",
00392                 host,serv,strerror(errno));
00393             close(sock);        
00394             continue;
00395         }
00396     
00397         setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
00398     
00399         LOG(L_INFO,"INFO:peer_connect(): Peer %.*s:%d connected\n",p->fqdn.len,p->fqdn.s,p->port);
00400 
00401     
00402         receiver_init(sock,p);
00403     
00404         #ifdef CDP_FOR_SER
00405             pid = fork_process(p->port,"receiver I",0);
00406         #else
00407             pid = fork();
00408         #endif
00409         if (pid<0){
00410             LOG(L_ERR,"ERROR:peer_connect(): fork() failed > %s\n",strerror(errno));
00411             goto error;
00412         }
00413         if (pid==0){
00414             /* child */
00415             receiver_process(sock);
00416             LOG(L_CRIT,"ERROR:peer_connect(): receiver_process finished without exit!\n");
00417             exit(-1);
00418         }else{
00419             /* parent */
00420             LOG(L_INFO,"INFO:peer_connect(): Receiver process forked [%d]\n",pid);
00421             
00422             dp_add_pid(pid);
00423         }
00424         
00425         if (res) freeaddrinfo(res);         
00426         return sock;
00427     }
00428 error:
00429     if (res) freeaddrinfo(res); 
00430     return -1;  
00431 }

int peer_send_msg ( peer p,
AAAMessage msg 
)

Sends a message to a peer (to be called from other processes).

This just writes the pointer to the message in the send pipe. The specific peer process will pick that up and send the message, as only that specific process has the id of socket (we are forking the peers dynamically and as such, the sockets are not visible between processes).

Parameters:
p - the peer to send to
msg - the message to send
Returns:
1 on success, 0 on failure

Definition at line 444 of file receiver.c.

References AAABuildMsgBuffer(), errno, _peer_t::fqdn, and _peer_t::send_pipe.

Referenced by AAASendMessage(), AAASendMessageToPeer(), AAASendRecvMessage(), AAASendRecvMessageToPeer(), api_callback(), I_Snd_CER(), Snd_DPA(), Snd_DPR(), Snd_DWA(), Snd_DWR(), and Snd_Message().

00445 {
00446     int fd,n;
00447     if (!AAABuildMsgBuffer(msg)) return 0;
00448     if (!p->send_pipe.s) {
00449         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s has no attached send pipe\n",p->fqdn.len,p->fqdn.s);
00450         return 0;
00451     }
00452     fd = open(p->send_pipe.s,O_WRONLY);
00453     if (fd<0){
00454         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe open > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));      
00455         return 0;
00456     }
00457     LOG(L_DBG,"DBG:peer_send_msg(): Pipe push [%p]\n",msg);
00458     n = write(fd,&msg,sizeof(AAAMessage *));
00459     if (n<0) {
00460         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));     
00461         close(fd);
00462         return 0;
00463     }
00464     if (n!=sizeof(AAAMessage *)) {
00465         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > only %d bytes written\n",p->fqdn.len,p->fqdn.s,n);        
00466         close(fd);
00467         return 0;
00468     }
00469     close(fd);
00470     return 1;
00471 }

int peer_send ( peer p,
int  sock,
AAAMessage msg,
int  locked 
)

Send a message to a peer (only to be called from the receiver process).

This directly writes the message on the socket. It is used for transmission during the Capability Exchange procedure, when the send pipes are not opened yet.

Parameters:
p - the peer to send to
sock - the socket to send through
msg - the message to send
locked - whether the caller locked the peer already
Returns:
1 on success, 0 on error

Definition at line 483 of file receiver.c.

References AAABuildMsgBuffer(), AAAFreeMessage(), _message_t::buf, errno, I_Peer_Disc, _peer_t::I_sock, _peer_t::lock, R_Peer_Disc, and sm_process().

Referenced by Snd_CEA().

00484 {
00485     int n;
00486 //  LOG(L_CRIT,"[%d]\n",sock);
00487     
00488     if (!p||!msg||sock<0) return 0;
00489     
00490     if (!AAABuildMsgBuffer(msg)) return 0;
00491     
00492     if (!locked) lock_get(p->lock);
00493 
00494     while( (n=write(sock,msg->buf.s,msg->buf.len))==-1 ) {
00495         if (errno==EINTR)
00496             continue;
00497         LOG(L_ERR,"ERROR:peer_send(): write returned error: %s\n",
00498             strerror(errno));
00499         if (p->I_sock==sock) sm_process(p,I_Peer_Disc,0,1,p->I_sock);
00500         if (p->R_sock==sock) sm_process(p,R_Peer_Disc,0,1,p->R_sock);
00501         if (!locked) lock_release(p->lock);
00502         AAAFreeMessage(&msg);       
00503         return 0;
00504     }
00505 
00506     if (n!=msg->buf.len){
00507         LOG(L_ERR,"ERROR:peer_send(): only wrote %d/%d bytes\n",n,msg->buf.len);
00508         if (!locked) lock_release(p->lock);
00509         AAAFreeMessage(&msg);       
00510         return 0;
00511     }
00512     if (!locked) lock_release(p->lock);
00513     AAAFreeMessage(&msg);           
00514     return 1;   
00515 }


Variable Documentation

dp_config* config

Configuration for this diameter peer.

Definition at line 76 of file diameter_peer.c.

peer* this_peer = 0

pointer to the peer serviced by this process

Definition at line 90 of file receiver.c.

Referenced by receive_loop(), receive_message(), receiver_init(), receiver_process(), and set_peer_pipe().

int pipe_fd

file descriptor for reading from the send pipe

Definition at line 94 of file receiver.c.

Referenced by receiver_init(), receiver_process(), and select_recv().

int pipe_fd_out

file descriptor for writting to the send pipe

Definition at line 95 of file receiver.c.

Referenced by receiver_init(), and receiver_process().

str pipe_name

full path to the pipe

Definition at line 96 of file receiver.c.

Referenced by receiver_init(), receiver_process(), select_recv(), and set_peer_pipe().


Generated on Fri Jul 18 04:14:01 2008 for Open IMS Core CSCFs by  doxygen 1.5.2