receiver.c

Go to the documentation of this file.
00001 
00055 #include <unistd.h>
00056 #include <errno.h>
00057 #include <string.h>
00058 #include <assert.h>
00059 #include <sys/time.h>
00060 #include <sys/types.h>
00061 #include <sys/socket.h>
00062 #include <sys/ioctl.h>
00063 #include <signal.h>
00064 #include <netinet/in.h>
00065 #include <netdb.h>
00066 #include <stdlib.h>
00067 #include <stdio.h>
00068 #include <time.h>
00069 #include <sys/stat.h>
00070 #include <fcntl.h>
00071 
00072 #include "utils.h"
00073 #include "globals.h"
00074 #include "diameter_api.h"
00075 #include "peerstatemachine.h"
00076 #include "peermanager.h"
00077 #include "config.h"
00078 
00079 #include "receiver.h"
00080 
00081 extern dp_config *config;       
00083 int dp_add_pid(pid_t pid);
00084 void dp_del_pid(pid_t pid);
00085 
00086 void receive_loop(int sock);
00087 
00088 void receive_message(AAAMessage *msg,int sock);
00089 
00090 peer *this_peer=0;          
00093 #define PIPE_PREFIX "/tmp/cdp_send_"
00094 int pipe_fd;        
00095 int pipe_fd_out;    
00096 str pipe_name;      
00101 static inline void set_peer_pipe()
00102 {
00103     if (!this_peer) return;
00104 //  lock_get(this_peer->lock);
00105     this_peer->send_pipe = pipe_name;
00106 //  lock_release(this_peer->lock);
00107 }
00108 
00114 void receiver_init(int sock,peer *p)
00115 {
00116     this_peer = p;
00117     
00118     pipe_name.s = shm_malloc(sizeof(PIPE_PREFIX)+64);
00119     sprintf(pipe_name.s,"%s%d_%d_%d",PIPE_PREFIX,(unsigned int) time(0),sock,getpid());
00120     pipe_name.len = strlen(pipe_name.s);
00121         
00122     set_peer_pipe();    
00123 
00124     mkfifo(pipe_name.s, 0666);  
00125     pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00126     if (pipe_fd<0){
00127         LOG(L_ERR,"ERROR:receiver_process(): FIFO open failed > %s\n",strerror(errno));
00128     }
00129     // we open it for writting just to keep it alive - won't close when all other writers close it
00130     pipe_fd_out = open(pipe_name.s, O_WRONLY);
00131 }
00132 
00138 void receiver_process(int sock)
00139 {
00140     LOG(L_INFO,"INFO:receiver_process(): [%d] Receiver process starting up...\n",sock);
00141 
00142         
00143     receive_loop(sock);
00144     LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process cleaning-up.\n",sock);
00145     close(sock);
00146     close(pipe_fd);
00147     close(pipe_fd_out);
00148     remove(pipe_name.s);
00149     if (this_peer){
00150         lock_get(this_peer->lock);
00151         this_peer->send_pipe.s=0;
00152         this_peer->send_pipe.len=0;
00153         lock_release(this_peer->lock);
00154     }
00155     shm_free(pipe_name.s);
00156 //done:     
00157     /* remove pid from list of running processes */
00158     dp_del_pid(getpid());
00159     
00160 #ifdef CDP_FOR_SER
00161     drop_my_process();      
00162 #else
00163 #ifdef PKG_MALLOC
00164     #ifdef PKG_MALLOC
00165         LOG(memlog, "Receiver[%d] Memory status (pkg):\n",sock);
00166         //pkg_status();
00167         #ifdef pkg_sums
00168             pkg_sums();
00169         #endif 
00170     #endif
00171 #endif
00172 #endif      
00173         
00174     LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process finished.\n",sock);
00175     exit(0);
00176 }
00177 
00187 static inline int select_recv(int s,void * buf,int len,int opt)
00188 {
00189     fd_set rfds,efds;
00190     struct timeval tv;
00191     int n,max;
00192     AAAMessage *msg=0;
00193     int r=0,cnt=0;
00194     
00195 //  if (shutdownx) return -1;
00196     max = s;
00197     if (pipe_fd>max) max = pipe_fd;
00198     n = 0;
00199     
00200     while(!n){
00201         if (shutdownx&&*shutdownx) break;   
00202 
00203         FD_ZERO(&rfds);
00204         FD_SET(s,&rfds);
00205         FD_SET(pipe_fd,&rfds);
00206         FD_ZERO(&efds);
00207         FD_SET(s,&efds);
00208         tv.tv_sec=1;
00209         tv.tv_usec=0;
00210 
00211 //      LOG(L_CRIT,"ERROR:select_recv(): HERE\n");
00212         
00213         n = select(max+1,&rfds,0,&efds,&tv);
00214         if (n==-1){
00215             if (shutdownx&&*shutdownx) return -1;
00216             LOG(L_ERR,"ERROR:select_recv(): %s\n",strerror(errno));
00217             return -1;
00218         }else
00219             if (n){
00220                 if (FD_ISSET(s,&efds)) return -1;               
00221                 if (FD_ISSET(pipe_fd,&rfds)) {                  
00222                     LOG(L_DBG,"DBG:select_recv(): There is something on the pipe\n");
00223                     cnt = read(pipe_fd,&msg,sizeof(AAAMessage *));
00224                     LOG(L_DBG,"DBG:select_recv(): Pipe says [%p] %d\n",msg,cnt);
00225                     if (cnt==0){
00226                         //This is very stupid and might not work well - droped messages... to be fixed
00227                         LOG(L_INFO,"INFO:select_recv(): ReOpening pipe for read. This should not happen...\n");
00228                         close(pipe_fd);
00229                         pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00230                         goto receive;
00231                     }
00232                     if (cnt<sizeof(AAAMessage *)){
00233                         if (cnt<0) LOG(L_ERR,"ERROR:select_recv(): Error reading from pipe\n");
00234                         r = -1;
00235                         goto receive;
00236                     }   
00237                     
00238                     while( (cnt=write(s,msg->buf.s,msg->buf.len))==-1 ) {
00239                         if (errno==EINTR)
00240                             continue;
00241                         LOG(L_ERR,"ERROR:select_recv(): write returned error> %s\n",
00242                             strerror(errno));
00243                         close(s);
00244                         AAAFreeMessage(&msg);       
00245                         r = -1;
00246                         return r;
00247                     }
00248                                             
00249                     if (cnt!=msg->buf.len){
00250                         LOG(L_ERR,"ERROR:select_recv(): only wrote %d/%d bytes\n",cnt,msg->buf.len);
00251                         close(s);
00252                         AAAFreeMessage(&msg);       
00253                         r = -1;
00254                         return r;
00255                     }
00256                     AAAFreeMessage(&msg);
00257                     //don't return, maybe there is something to read
00258                 }
00259 receive:
00260                 if (FD_ISSET(s,&rfds)) {
00261                     cnt = recv(s,buf,len,opt);
00262                     if (cnt==0) return -1;
00263                     else return cnt;
00264                 }
00265             }
00266         //LOG(L_ERR,".");
00267     }
00268     return r;
00269 }
00270 
00272 #define hdr_len 20
00273 
00280 void receive_loop(int sock)
00281 {
00282     char buf[hdr_len],*msg;
00283     int buf_len,length,version,cnt,msg_len;
00284     AAAMessage *dmsg;
00285     
00286     while(!*shutdownx){
00287         buf_len=0;
00288         while(buf_len<1){   
00289             cnt = select_recv(sock,buf+buf_len,1,0);
00290             if (cnt<0) goto error;
00291             buf_len+=cnt;
00292         }
00293         version = (unsigned char)buf[0];
00294         if (version!=1) {
00295             LOG(L_ERR,"ERROR:receive_loop():[%d] Recv Unknown version [%d]\n",sock,(unsigned char)buf[0]);
00296             continue;           
00297         }       
00298         while(buf_len<hdr_len){
00299             cnt = select_recv(sock,buf+buf_len,hdr_len-buf_len,0);
00300             if (cnt<0) goto error;
00301             buf_len+=cnt;
00302         }
00303         length = get_3bytes(buf+1);
00304         if (length>DP_MAX_MSG_LENGTH){
00305             LOG(L_ERR,"ERROR:receive_loop():[%d] Msg too big [%d] bytes\n",sock,length);
00306             goto error;
00307         }
00308         LOG(L_DBG,"DBG:receive_loop():[%d] Recv Version %d Length %d\n",sock,version,length);
00309         msg = shm_malloc(length);
00310         if (!msg) {
00311             LOG_NO_MEM("shm",length);
00312             goto error;
00313         }
00314             
00315         memcpy(msg,buf,hdr_len);
00316         msg_len=hdr_len;
00317         while(msg_len<length){
00318             cnt = select_recv(sock,msg+msg_len,length-msg_len,0);
00319             if (cnt<0) {
00320                 shm_free(msg);
00321                 goto error;
00322             }
00323             msg_len+=cnt;
00324         }
00325         LOG(L_DBG,"DBG:receive_loop():[%d] Recv message complete\n",sock);
00326         
00327         dmsg = AAATranslateMessage((unsigned char*)msg,(unsigned int)msg_len,1);
00328         
00329         /*shm_free(msg);*/
00330 
00331         if (dmsg) receive_message(dmsg,sock);
00332         else{
00333             shm_free(msg);
00334         }
00335         
00336     }
00337 error:
00338     if (this_peer) {
00339         if (this_peer->I_sock == sock) sm_process(this_peer,I_Peer_Disc,0,0,sock);
00340         if (this_peer->R_sock == sock) sm_process(this_peer,R_Peer_Disc,0,0,sock);
00341     }
00342     LOG(L_ERR,"INFO:receive_loop():[%d] Client closed connection or error... BYE\n",sock);
00343 }
00344 
00350 int peer_connect(peer *p)
00351 {
00352     int sock;
00353     int pid;
00354     unsigned int option = 1;
00355     
00356     struct addrinfo *ainfo=0,*res=0,hints;      
00357     char buf[256],host[256],serv[256];
00358     int error;
00359 
00360     memset (&hints, 0, sizeof(hints));
00361     //hints.ai_protocol = IPPROTO_SCTP;
00362     //hints.ai_protocol = IPPROTO_TCP;
00363     hints.ai_flags = AI_ADDRCONFIG;
00364     hints.ai_socktype = SOCK_STREAM;
00365     
00366     sprintf(buf,"%d",p->port);
00367 
00368     error = getaddrinfo(p->fqdn.s, buf, &hints, &res);
00369 
00370     if (error!=0){
00371         LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to %.*s:%d >%s\n",
00372             p->fqdn.len,p->fqdn.s,p->port,gai_strerror(error));
00373         goto error;
00374     }
00375         
00376     for(ainfo = res;ainfo;ainfo = ainfo->ai_next)
00377     {
00378         if (getnameinfo(ainfo->ai_addr,ainfo->ai_addrlen,
00379             host,256,serv,256,NI_NUMERICHOST|NI_NUMERICSERV)==0){
00380                 LOG(L_WARN,"INFO:peer_connect(): Trying to connect to %s port %s\n",
00381                     host,serv);
00382         }               
00383 
00384         if ((sock = socket(ainfo->ai_family, ainfo->ai_socktype, ainfo->ai_protocol)) == -1) {
00385             LOG(L_ERR,"ERROR:create_socket(): error creating client socket to %s port %s >"
00386                 " %s\n",host,serv,strerror(errno));
00387             continue;
00388         }
00389 
00390         if (connect(sock,ainfo->ai_addr,ainfo->ai_addrlen)!=0) {
00391             LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to to %s port %s >%s\n",
00392                 host,serv,strerror(errno));
00393             close(sock);        
00394             continue;
00395         }
00396     
00397         setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
00398     
00399         LOG(L_INFO,"INFO:peer_connect(): Peer %.*s:%d connected\n",p->fqdn.len,p->fqdn.s,p->port);
00400 
00401     
00402         receiver_init(sock,p);
00403     
00404         #ifdef CDP_FOR_SER
00405             pid = fork_process(p->port,"receiver I",0);
00406         #else
00407             pid = fork();
00408         #endif
00409         if (pid<0){
00410             LOG(L_ERR,"ERROR:peer_connect(): fork() failed > %s\n",strerror(errno));
00411             goto error;
00412         }
00413         if (pid==0){
00414             /* child */
00415             receiver_process(sock);
00416             LOG(L_CRIT,"ERROR:peer_connect(): receiver_process finished without exit!\n");
00417             exit(-1);
00418         }else{
00419             /* parent */
00420             LOG(L_INFO,"INFO:peer_connect(): Receiver process forked [%d]\n",pid);
00421             
00422             dp_add_pid(pid);
00423         }
00424         
00425         if (res) freeaddrinfo(res);         
00426         return sock;
00427     }
00428 error:
00429     if (res) freeaddrinfo(res); 
00430     return -1;  
00431 }
00432 
00433 
00444 int peer_send_msg(peer *p,AAAMessage *msg)
00445 {
00446     int fd,n;
00447     if (!AAABuildMsgBuffer(msg)) return 0;
00448     if (!p->send_pipe.s) {
00449         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s has no attached send pipe\n",p->fqdn.len,p->fqdn.s);
00450         return 0;
00451     }
00452     fd = open(p->send_pipe.s,O_WRONLY);
00453     if (fd<0){
00454         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe open > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));      
00455         return 0;
00456     }
00457     LOG(L_DBG,"DBG:peer_send_msg(): Pipe push [%p]\n",msg);
00458     n = write(fd,&msg,sizeof(AAAMessage *));
00459     if (n<0) {
00460         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));     
00461         close(fd);
00462         return 0;
00463     }
00464     if (n!=sizeof(AAAMessage *)) {
00465         LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > only %d bytes written\n",p->fqdn.len,p->fqdn.s,n);        
00466         close(fd);
00467         return 0;
00468     }
00469     close(fd);
00470     return 1;
00471 }
00472 
00483 int peer_send(peer *p,int sock,AAAMessage *msg,int locked)
00484 {
00485     int n;
00486 //  LOG(L_CRIT,"[%d]\n",sock);
00487     
00488     if (!p||!msg||sock<0) return 0;
00489     
00490     if (!AAABuildMsgBuffer(msg)) return 0;
00491     
00492     if (!locked) lock_get(p->lock);
00493 
00494     while( (n=write(sock,msg->buf.s,msg->buf.len))==-1 ) {
00495         if (errno==EINTR)
00496             continue;
00497         LOG(L_ERR,"ERROR:peer_send(): write returned error: %s\n",
00498             strerror(errno));
00499         if (p->I_sock==sock) sm_process(p,I_Peer_Disc,0,1,p->I_sock);
00500         if (p->R_sock==sock) sm_process(p,R_Peer_Disc,0,1,p->R_sock);
00501         if (!locked) lock_release(p->lock);
00502         AAAFreeMessage(&msg);       
00503         return 0;
00504     }
00505 
00506     if (n!=msg->buf.len){
00507         LOG(L_ERR,"ERROR:peer_send(): only wrote %d/%d bytes\n",n,msg->buf.len);
00508         if (!locked) lock_release(p->lock);
00509         AAAFreeMessage(&msg);       
00510         return 0;
00511     }
00512     if (!locked) lock_release(p->lock);
00513     AAAFreeMessage(&msg);           
00514     return 1;   
00515 }
00516 
00517 
00524 void receive_message(AAAMessage *msg,int sock)
00525 {
00526     AAA_AVP *avp1,*avp2;
00527     LOG(L_DBG,"DBG:receive_message(): [%d] Recv msg %d\n",sock,msg->commandCode);
00528 
00529     if (!this_peer) {
00530         this_peer = get_peer_from_sock(sock);
00531         set_peer_pipe();
00532     }
00533     
00534     if (!this_peer){
00535         switch (msg->commandCode){
00536             case Code_CE:
00537                 if (is_req(msg)){
00538                     avp1 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Host,0,0);
00539                     avp2 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Realm,0,0);
00540                     if (avp1&&avp2){
00541                         this_peer = get_peer_from_fqdn(avp1->data,avp2->data);
00542                     }
00543                     if (!this_peer) {
00544                         LOG(L_ERR,"ERROR:receive_msg(): Received CER from unknown peer (accept unknown=%d) -ignored\n",
00545                             config->accept_unknown_peers);
00546                         AAAFreeMessage(&msg);
00547                     }else{
00548                         set_peer_pipe();                        
00549                         sm_process(this_peer,R_Conn_CER,msg,0,sock);
00550                     }
00551                 }
00552                 else{
00553                     LOG(L_ERR,"ERROR:receive_msg(): Received CEA from an unknown peer -ignored\n");
00554                     AAAFreeMessage(&msg);
00555                 }
00556                 break;
00557             default:
00558                 LOG(L_ERR,"ERROR:receive_msg(): Received non-CE from an unknown peer -ignored\n");
00559                 AAAFreeMessage(&msg);               
00560         }
00561     }else{
00562         touch_peer(this_peer);
00563         switch (this_peer->state){
00564             case Wait_I_CEA:
00565                 if (msg->commandCode!=Code_CE||is_req(msg)){
00566                     sm_process(this_peer,I_Rcv_Non_CEA,msg,0,sock);
00567                 }else
00568                     sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00569                 break;
00570             case I_Open:            
00571                 switch (msg->commandCode){
00572                     case Code_CE:
00573                         if (is_req(msg)) sm_process(this_peer,I_Rcv_CER,msg,0,sock);    
00574                                     else sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00575                         break;
00576                     case Code_DW:
00577                         if (is_req(msg)) sm_process(this_peer,I_Rcv_DWR,msg,0,sock);    
00578                                     else sm_process(this_peer,I_Rcv_DWA,msg,0,sock);
00579                         break;
00580                     case Code_DP:
00581                         if (is_req(msg)) sm_process(this_peer,I_Rcv_DPR,msg,0,sock);    
00582                                     else sm_process(this_peer,I_Rcv_DPA,msg,0,sock);
00583                         break;
00584                     default:
00585                         sm_process(this_peer,I_Rcv_Message,msg,0,sock);
00586                 }               
00587                 break;              
00588             case R_Open:            
00589                 switch (msg->commandCode){
00590                     case Code_CE:
00591                         if (is_req(msg)) sm_process(this_peer,R_Rcv_CER,msg,0,sock);    
00592                                     else sm_process(this_peer,R_Rcv_CEA,msg,0,sock);
00593                         break;
00594                     case Code_DW:
00595                         if (is_req(msg)) sm_process(this_peer,R_Rcv_DWR,msg,0,sock);    
00596                                     else sm_process(this_peer,R_Rcv_DWA,msg,0,sock);
00597                         break;
00598                     case Code_DP:
00599                         if (is_req(msg)) sm_process(this_peer,R_Rcv_DPR,msg,0,sock);    
00600                                     else sm_process(this_peer,R_Rcv_DPA,msg,0,sock);
00601                         break;
00602                     default:
00603                         sm_process(this_peer,R_Rcv_Message,msg,0,sock);
00604                 }               
00605                 break;              
00606             default:
00607                 LOG(L_ERR,"ERROR:receive_msg(): Received msg while peer in state %d -ignored\n",this_peer->state);
00608                 AAAFreeMessage(&msg);                               
00609         }
00610     }
00611 }

Generated on Tue Jul 29 04:19:11 2008 for Open IMS Core CSCFs by  doxygen 1.5.2