00001
00055 #include <unistd.h>
00056 #include <errno.h>
00057 #include <string.h>
00058 #include <assert.h>
00059 #include <sys/time.h>
00060 #include <sys/types.h>
00061 #include <sys/socket.h>
00062 #include <sys/ioctl.h>
00063 #include <signal.h>
00064 #include <netinet/in.h>
00065 #include <netdb.h>
00066 #include <stdlib.h>
00067 #include <stdio.h>
00068 #include <time.h>
00069 #include <sys/stat.h>
00070 #include <fcntl.h>
00071
00072 #include "utils.h"
00073 #include "globals.h"
00074 #include "diameter_api.h"
00075 #include "peerstatemachine.h"
00076 #include "peermanager.h"
00077 #include "config.h"
00078
00079 #include "receiver.h"
00080
00081 extern dp_config *config;
00083 int dp_add_pid(pid_t pid);
00084 void dp_del_pid(pid_t pid);
00085
00086 void receive_loop(int sock);
00087
00088 void receive_message(AAAMessage *msg,int sock);
00089
00090 peer *this_peer=0;
00093 #define PIPE_PREFIX "/tmp/cdp_send_"
00094 int pipe_fd;
00095 int pipe_fd_out;
00096 str pipe_name;
00101 static inline void set_peer_pipe()
00102 {
00103 if (!this_peer) return;
00104
00105 this_peer->send_pipe = pipe_name;
00106
00107 }
00108
00114 void receiver_init(int sock,peer *p)
00115 {
00116 this_peer = p;
00117
00118 pipe_name.s = shm_malloc(sizeof(PIPE_PREFIX)+64);
00119 sprintf(pipe_name.s,"%s%d_%d_%d",PIPE_PREFIX,(unsigned int) time(0),sock,getpid());
00120 pipe_name.len = strlen(pipe_name.s);
00121
00122 set_peer_pipe();
00123
00124 mkfifo(pipe_name.s, 0666);
00125 pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00126 if (pipe_fd<0){
00127 LOG(L_ERR,"ERROR:receiver_process(): FIFO open failed > %s\n",strerror(errno));
00128 }
00129
00130 pipe_fd_out = open(pipe_name.s, O_WRONLY);
00131 }
00132
00138 void receiver_process(int sock)
00139 {
00140 LOG(L_INFO,"INFO:receiver_process(): [%d] Receiver process starting up...\n",sock);
00141
00142
00143 receive_loop(sock);
00144 LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process cleaning-up.\n",sock);
00145 close(sock);
00146 close(pipe_fd);
00147 close(pipe_fd_out);
00148 remove(pipe_name.s);
00149 if (this_peer){
00150 lock_get(this_peer->lock);
00151 this_peer->send_pipe.s=0;
00152 this_peer->send_pipe.len=0;
00153 lock_release(this_peer->lock);
00154 }
00155 shm_free(pipe_name.s);
00156
00157
00158 dp_del_pid(getpid());
00159
00160 #ifdef CDP_FOR_SER
00161 drop_my_process();
00162 #else
00163 #ifdef PKG_MALLOC
00164 #ifdef PKG_MALLOC
00165 LOG(memlog, "Receiver[%d] Memory status (pkg):\n",sock);
00166
00167 #ifdef pkg_sums
00168 pkg_sums();
00169 #endif
00170 #endif
00171 #endif
00172 #endif
00173
00174 LOG(L_INFO,"INFO:receiver_process(): [%d]... Receiver process finished.\n",sock);
00175 exit(0);
00176 }
00177
00187 static inline int select_recv(int s,void * buf,int len,int opt)
00188 {
00189 fd_set rfds,efds;
00190 struct timeval tv;
00191 int n,max;
00192 AAAMessage *msg=0;
00193 int r=0,cnt=0;
00194
00195
00196 max = s;
00197 if (pipe_fd>max) max = pipe_fd;
00198 n = 0;
00199
00200 while(!n){
00201 if (shutdownx&&*shutdownx) break;
00202
00203 FD_ZERO(&rfds);
00204 FD_SET(s,&rfds);
00205 FD_SET(pipe_fd,&rfds);
00206 FD_ZERO(&efds);
00207 FD_SET(s,&efds);
00208 tv.tv_sec=1;
00209 tv.tv_usec=0;
00210
00211
00212
00213 n = select(max+1,&rfds,0,&efds,&tv);
00214 if (n==-1){
00215 if (shutdownx&&*shutdownx) return -1;
00216 LOG(L_ERR,"ERROR:select_recv(): %s\n",strerror(errno));
00217 return -1;
00218 }else
00219 if (n){
00220 if (FD_ISSET(s,&efds)) return -1;
00221 if (FD_ISSET(pipe_fd,&rfds)) {
00222 LOG(L_DBG,"DBG:select_recv(): There is something on the pipe\n");
00223 cnt = read(pipe_fd,&msg,sizeof(AAAMessage *));
00224 LOG(L_DBG,"DBG:select_recv(): Pipe says [%p] %d\n",msg,cnt);
00225 if (cnt==0){
00226
00227 LOG(L_INFO,"INFO:select_recv(): ReOpening pipe for read. This should not happen...\n");
00228 close(pipe_fd);
00229 pipe_fd = open(pipe_name.s, O_RDONLY | O_NDELAY);
00230 goto receive;
00231 }
00232 if (cnt<sizeof(AAAMessage *)){
00233 if (cnt<0) LOG(L_ERR,"ERROR:select_recv(): Error reading from pipe\n");
00234 r = -1;
00235 goto receive;
00236 }
00237
00238 while( (cnt=write(s,msg->buf.s,msg->buf.len))==-1 ) {
00239 if (errno==EINTR)
00240 continue;
00241 LOG(L_ERR,"ERROR:select_recv(): write returned error> %s\n",
00242 strerror(errno));
00243 close(s);
00244 AAAFreeMessage(&msg);
00245 r = -1;
00246 return r;
00247 }
00248
00249 if (cnt!=msg->buf.len){
00250 LOG(L_ERR,"ERROR:select_recv(): only wrote %d/%d bytes\n",cnt,msg->buf.len);
00251 close(s);
00252 AAAFreeMessage(&msg);
00253 r = -1;
00254 return r;
00255 }
00256 AAAFreeMessage(&msg);
00257
00258 }
00259 receive:
00260 if (FD_ISSET(s,&rfds)) {
00261 cnt = recv(s,buf,len,opt);
00262 if (cnt==0) return -1;
00263 else return cnt;
00264 }
00265 }
00266
00267 }
00268 return r;
00269 }
00270
00272 #define hdr_len 20
00273
00280 void receive_loop(int sock)
00281 {
00282 char buf[hdr_len],*msg;
00283 int buf_len,length,version,cnt,msg_len;
00284 AAAMessage *dmsg;
00285
00286 while(!*shutdownx){
00287 buf_len=0;
00288 while(buf_len<1){
00289 cnt = select_recv(sock,buf+buf_len,1,0);
00290 if (cnt<0) goto error;
00291 buf_len+=cnt;
00292 }
00293 version = (unsigned char)buf[0];
00294 if (version!=1) {
00295 LOG(L_ERR,"ERROR:receive_loop():[%d] Recv Unknown version [%d]\n",sock,(unsigned char)buf[0]);
00296 continue;
00297 }
00298 while(buf_len<hdr_len){
00299 cnt = select_recv(sock,buf+buf_len,hdr_len-buf_len,0);
00300 if (cnt<0) goto error;
00301 buf_len+=cnt;
00302 }
00303 length = get_3bytes(buf+1);
00304 if (length>DP_MAX_MSG_LENGTH){
00305 LOG(L_ERR,"ERROR:receive_loop():[%d] Msg too big [%d] bytes\n",sock,length);
00306 goto error;
00307 }
00308 LOG(L_DBG,"DBG:receive_loop():[%d] Recv Version %d Length %d\n",sock,version,length);
00309 msg = shm_malloc(length);
00310 if (!msg) {
00311 LOG_NO_MEM("shm",length);
00312 goto error;
00313 }
00314
00315 memcpy(msg,buf,hdr_len);
00316 msg_len=hdr_len;
00317 while(msg_len<length){
00318 cnt = select_recv(sock,msg+msg_len,length-msg_len,0);
00319 if (cnt<0) {
00320 shm_free(msg);
00321 goto error;
00322 }
00323 msg_len+=cnt;
00324 }
00325 LOG(L_DBG,"DBG:receive_loop():[%d] Recv message complete\n",sock);
00326
00327 dmsg = AAATranslateMessage((unsigned char*)msg,(unsigned int)msg_len,1);
00328
00329
00330
00331 if (dmsg) receive_message(dmsg,sock);
00332 else{
00333 shm_free(msg);
00334 }
00335
00336 }
00337 error:
00338 if (this_peer) {
00339 if (this_peer->I_sock == sock) sm_process(this_peer,I_Peer_Disc,0,0,sock);
00340 if (this_peer->R_sock == sock) sm_process(this_peer,R_Peer_Disc,0,0,sock);
00341 }
00342 LOG(L_ERR,"INFO:receive_loop():[%d] Client closed connection or error... BYE\n",sock);
00343 }
00344
00350 int peer_connect(peer *p)
00351 {
00352 int sock;
00353 int pid;
00354 unsigned int option = 1;
00355
00356 struct addrinfo *ainfo=0,*res=0,hints;
00357 char buf[256],host[256],serv[256];
00358 int error;
00359
00360 memset (&hints, 0, sizeof(hints));
00361
00362
00363 hints.ai_flags = AI_ADDRCONFIG;
00364 hints.ai_socktype = SOCK_STREAM;
00365
00366 sprintf(buf,"%d",p->port);
00367
00368 error = getaddrinfo(p->fqdn.s, buf, &hints, &res);
00369
00370 if (error!=0){
00371 LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to %.*s:%d >%s\n",
00372 p->fqdn.len,p->fqdn.s,p->port,gai_strerror(error));
00373 goto error;
00374 }
00375
00376 for(ainfo = res;ainfo;ainfo = ainfo->ai_next)
00377 {
00378 if (getnameinfo(ainfo->ai_addr,ainfo->ai_addrlen,
00379 host,256,serv,256,NI_NUMERICHOST|NI_NUMERICSERV)==0){
00380 LOG(L_WARN,"INFO:peer_connect(): Trying to connect to %s port %s\n",
00381 host,serv);
00382 }
00383
00384 if ((sock = socket(ainfo->ai_family, ainfo->ai_socktype, ainfo->ai_protocol)) == -1) {
00385 LOG(L_ERR,"ERROR:create_socket(): error creating client socket to %s port %s >"
00386 " %s\n",host,serv,strerror(errno));
00387 continue;
00388 }
00389
00390 if (connect(sock,ainfo->ai_addr,ainfo->ai_addrlen)!=0) {
00391 LOG(L_WARN,"WARNING:peer_connect(): Error opening connection to to %s port %s >%s\n",
00392 host,serv,strerror(errno));
00393 close(sock);
00394 continue;
00395 }
00396
00397 setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
00398
00399 LOG(L_INFO,"INFO:peer_connect(): Peer %.*s:%d connected\n",p->fqdn.len,p->fqdn.s,p->port);
00400
00401
00402 receiver_init(sock,p);
00403
00404 #ifdef CDP_FOR_SER
00405 pid = fork_process(p->port,"receiver I",0);
00406 #else
00407 pid = fork();
00408 #endif
00409 if (pid<0){
00410 LOG(L_ERR,"ERROR:peer_connect(): fork() failed > %s\n",strerror(errno));
00411 goto error;
00412 }
00413 if (pid==0){
00414
00415 receiver_process(sock);
00416 LOG(L_CRIT,"ERROR:peer_connect(): receiver_process finished without exit!\n");
00417 exit(-1);
00418 }else{
00419
00420 LOG(L_INFO,"INFO:peer_connect(): Receiver process forked [%d]\n",pid);
00421
00422 dp_add_pid(pid);
00423 }
00424
00425 if (res) freeaddrinfo(res);
00426 return sock;
00427 }
00428 error:
00429 if (res) freeaddrinfo(res);
00430 return -1;
00431 }
00432
00433
00444 int peer_send_msg(peer *p,AAAMessage *msg)
00445 {
00446 int fd,n;
00447 if (!AAABuildMsgBuffer(msg)) return 0;
00448 if (!p->send_pipe.s) {
00449 LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s has no attached send pipe\n",p->fqdn.len,p->fqdn.s);
00450 return 0;
00451 }
00452 fd = open(p->send_pipe.s,O_WRONLY);
00453 if (fd<0){
00454 LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe open > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));
00455 return 0;
00456 }
00457 LOG(L_DBG,"DBG:peer_send_msg(): Pipe push [%p]\n",msg);
00458 n = write(fd,&msg,sizeof(AAAMessage *));
00459 if (n<0) {
00460 LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > %s\n",p->fqdn.len,p->fqdn.s,strerror(errno));
00461 close(fd);
00462 return 0;
00463 }
00464 if (n!=sizeof(AAAMessage *)) {
00465 LOG(L_ERR,"ERROR:peer_send_msg(): Peer %.*s error on pipe write > only %d bytes written\n",p->fqdn.len,p->fqdn.s,n);
00466 close(fd);
00467 return 0;
00468 }
00469 close(fd);
00470 return 1;
00471 }
00472
00483 int peer_send(peer *p,int sock,AAAMessage *msg,int locked)
00484 {
00485 int n;
00486
00487
00488 if (!p||!msg||sock<0) return 0;
00489
00490 if (!AAABuildMsgBuffer(msg)) return 0;
00491
00492 if (!locked) lock_get(p->lock);
00493
00494 while( (n=write(sock,msg->buf.s,msg->buf.len))==-1 ) {
00495 if (errno==EINTR)
00496 continue;
00497 LOG(L_ERR,"ERROR:peer_send(): write returned error: %s\n",
00498 strerror(errno));
00499 if (p->I_sock==sock) sm_process(p,I_Peer_Disc,0,1,p->I_sock);
00500 if (p->R_sock==sock) sm_process(p,R_Peer_Disc,0,1,p->R_sock);
00501 if (!locked) lock_release(p->lock);
00502 AAAFreeMessage(&msg);
00503 return 0;
00504 }
00505
00506 if (n!=msg->buf.len){
00507 LOG(L_ERR,"ERROR:peer_send(): only wrote %d/%d bytes\n",n,msg->buf.len);
00508 if (!locked) lock_release(p->lock);
00509 AAAFreeMessage(&msg);
00510 return 0;
00511 }
00512 if (!locked) lock_release(p->lock);
00513 AAAFreeMessage(&msg);
00514 return 1;
00515 }
00516
00517
00524 void receive_message(AAAMessage *msg,int sock)
00525 {
00526 AAA_AVP *avp1,*avp2;
00527 LOG(L_DBG,"DBG:receive_message(): [%d] Recv msg %d\n",sock,msg->commandCode);
00528
00529 if (!this_peer) {
00530 this_peer = get_peer_from_sock(sock);
00531 set_peer_pipe();
00532 }
00533
00534 if (!this_peer){
00535 switch (msg->commandCode){
00536 case Code_CE:
00537 if (is_req(msg)){
00538 avp1 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Host,0,0);
00539 avp2 = AAAFindMatchingAVP(msg,msg->avpList.head,AVP_Origin_Realm,0,0);
00540 if (avp1&&avp2){
00541 this_peer = get_peer_from_fqdn(avp1->data,avp2->data);
00542 }
00543 if (!this_peer) {
00544 LOG(L_ERR,"ERROR:receive_msg(): Received CER from unknown peer (accept unknown=%d) -ignored\n",
00545 config->accept_unknown_peers);
00546 AAAFreeMessage(&msg);
00547 }else{
00548 set_peer_pipe();
00549 sm_process(this_peer,R_Conn_CER,msg,0,sock);
00550 }
00551 }
00552 else{
00553 LOG(L_ERR,"ERROR:receive_msg(): Received CEA from an unknown peer -ignored\n");
00554 AAAFreeMessage(&msg);
00555 }
00556 break;
00557 default:
00558 LOG(L_ERR,"ERROR:receive_msg(): Received non-CE from an unknown peer -ignored\n");
00559 AAAFreeMessage(&msg);
00560 }
00561 }else{
00562 touch_peer(this_peer);
00563 switch (this_peer->state){
00564 case Wait_I_CEA:
00565 if (msg->commandCode!=Code_CE||is_req(msg)){
00566 sm_process(this_peer,I_Rcv_Non_CEA,msg,0,sock);
00567 }else
00568 sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00569 break;
00570 case I_Open:
00571 switch (msg->commandCode){
00572 case Code_CE:
00573 if (is_req(msg)) sm_process(this_peer,I_Rcv_CER,msg,0,sock);
00574 else sm_process(this_peer,I_Rcv_CEA,msg,0,sock);
00575 break;
00576 case Code_DW:
00577 if (is_req(msg)) sm_process(this_peer,I_Rcv_DWR,msg,0,sock);
00578 else sm_process(this_peer,I_Rcv_DWA,msg,0,sock);
00579 break;
00580 case Code_DP:
00581 if (is_req(msg)) sm_process(this_peer,I_Rcv_DPR,msg,0,sock);
00582 else sm_process(this_peer,I_Rcv_DPA,msg,0,sock);
00583 break;
00584 default:
00585 sm_process(this_peer,I_Rcv_Message,msg,0,sock);
00586 }
00587 break;
00588 case R_Open:
00589 switch (msg->commandCode){
00590 case Code_CE:
00591 if (is_req(msg)) sm_process(this_peer,R_Rcv_CER,msg,0,sock);
00592 else sm_process(this_peer,R_Rcv_CEA,msg,0,sock);
00593 break;
00594 case Code_DW:
00595 if (is_req(msg)) sm_process(this_peer,R_Rcv_DWR,msg,0,sock);
00596 else sm_process(this_peer,R_Rcv_DWA,msg,0,sock);
00597 break;
00598 case Code_DP:
00599 if (is_req(msg)) sm_process(this_peer,R_Rcv_DPR,msg,0,sock);
00600 else sm_process(this_peer,R_Rcv_DPA,msg,0,sock);
00601 break;
00602 default:
00603 sm_process(this_peer,R_Rcv_Message,msg,0,sock);
00604 }
00605 break;
00606 default:
00607 LOG(L_ERR,"ERROR:receive_msg(): Received msg while peer in state %d -ignored\n",this_peer->state);
00608 AAAFreeMessage(&msg);
00609 }
00610 }
00611 }