Страниц: [1]
  Печать  
Автор Тема: Многопоточный TCP сервер и однопоточный TCP клиент: пару вопросов  (Прочитано 11641 раз)
Fregl
Sr. Member
****
Offline Offline

Сообщений: 396


Просмотр профиля
« : Мая 18, 2010, 05:07:46 pm »

Столкнулся с такой непонятной для меня вещью:
Есть многопоточный сервер, в котором в одном потоке создается сокет для прослушки, а далее в пуле потоков собственно идет прием подключений от клиентов и обработка данных от этих же клиентов.
Код:
THREAD_POOL_PARAM_T * server_t::__sock_block(THREAD_POOL_PARAM_T * ctp) //здесь принимаю очередное подключение от клиента
{
// printf("__sock_block(0x%X) waiting for client\n",ctp);
client_t * client = (client_t*)ctp;
server_t * server = (server_t*)client->server_ptr;
client->sockaddr_len = sizeof(client->addr);
if((client->fd = accept(server->fd,(struct sockaddr *)&client->addr,&client->sockaddr_len))==-1)
{
if(server->on_accept_error) server->on_accept_error(server,errno);
}
// printf("__sock_block(0x%X) accepted\n",ctp);
return (THREAD_POOL_PARAM_T*)ctp;
}

int server_t::__sock_handler(THREAD_POOL_PARAM_T * ctp) //петля опроса клиента
{
puts(__func__);
client_t * client = (client_t*)ctp;
server_t * server = (server_t*)client->server_ptr;
int read_bytes;
if(client->fd>=0)
{
while(1)
{
// read_bytes = recv(client->fd,client->read_buffer,server->read_buffer_size,0);
read_bytes = read(client->fd,client->read_buffer,server->read_buffer_size); //<<<<все время получаю 0 байт после подключения клиента!
printf("read_bytes %i\n",read_bytes);
/*
if(read_bytes==0)
{
if(server->on_disconnected) server->on_disconnected(server,client);
close(client->fd);
client->fd=-1;
return 0;
}
else
if(read_bytes==-1)
{
if(server->on_client_error) server->on_client_error(server,client,errno);
close(client->fd);
client->fd=-1;
return 0;
}
else
if(server->on_received) server->on_received(server,client,read_bytes);
*/

}
}
return 0;
}

Клиент упрощенно выглядит так (в другой программе):
Код:
void * socket_t::read_func(void * p)
{
socket_t * a_socket = (socket_t*)p;
ssize_t received = 0;
a_socket->recv_buffer = malloc(a_socket->recv_buffer_size);
uint32_t reconnect_count = a_socket->reconnect_count;
pthread_cleanup_push(&socket_t::on_cancel,a_socket);
while(1)
{
if(a_socket->status==SOCKET_CONNECTED)
{
//received = recv(a_socket->fd,a_socket->recv_buffer,a_socket->recv_buffer_size,0);
received = read(a_socket->fd,a_socket->recv_buffer,a_socket->recv_buffer_size);
printf("received %i\n",received);
if(received>0)
{
if(a_socket->on_received) a_socket->on_received(a_socket,a_socket->recv_buffer,received);
continue;
}
if(received==0 && a_socket->on_disconnected) a_socket->on_disconnected(a_socket);
if(received<0 && a_socket->on_error)
{
printf("sock read error : %s\n",sys_errlist[errno]);
a_socket->on_error(a_socket);
}
a_socket->status = SOCKET_NOT_OK;
}
else
{
socket_close(a_socket); //закрываем сокет
/*
if(a_socket->reconnect_count)
{
if(!reconnect_count) pthread_exit(NULL); //нормальное завершение
else
reconnect_count--;
}
*/
delay(a_socket->reconnect_time); //спим
if((a_socket->status = socket_init(a_socket))!=SOCKET_OK) return (void*)a_socket->status; //ошибка инициализации сокета
if((a_socket->status = socket_open(a_socket))==SOCKET_CONNECTED)
{
if(a_socket->on_connected) a_socket->on_connected(a_socket);
}
else
{
if(a_socket->on_error) a_socket->on_error(a_socket);
}
}
pthread_testcancel();
}
    pthread_cleanup_pop(1);
}

Так вот столкнулся с такой вещью: после подключения клиента к серверу, на стороне сервера в петле опроса клиента в read(recv) в результате всегда получаю ноль, как будто клиент отключился. Я так понимаю серверный сокет должен блокироваться на read(recv) до получения каких либо данных со стороны клиента (когда вызывать send|write)? Никаких особых настроек сокетов не производилось (только настроены таймауты приема отправки на 2 секунды макс.).
У клиента тоже петля опроса даннхы с сервера как показано выше. В тонкостях сокетов не силен, если что упустил подскажите куда копать? (не до забора Smiley ).
« Последнее редактирование: Мая 18, 2010, 05:10:33 pm от Fregl » Записан
Fregl
Sr. Member
****
Offline Offline

Сообщений: 396


Просмотр профиля
« Ответ #1 : Мая 18, 2010, 06:19:38 pm »

а вот вариант однопоточного сервера, который работает:
Код:
void * server_func(void * p)
{
server_t * s = (server_t*)p;
void * read_buffer = malloc(s->read_buffer_size);
struct sockaddr_in addr={0};
socklen_t sockaddr_len = sizeof(&addr);
int fd,read_bytes;

if((fd = accept(s->fd,(struct sockaddr *)&addr,&sockaddr_len))==-1)
{
if(s->on_accept_error) s->on_accept_error(s,errno);
}
else
{
if(s->on_accept) s->on_accept(s,NULL);
}
while(1)
{
printf("reading from socket %i\n",fd);
read_bytes = recv(fd,read_buffer,s->read_buffer_size,0);
printf("read_bytes %i\n",read_bytes);
if(read_bytes==0)
{
if(s->on_disconnected) s->on_disconnected(s,NULL);
break;
}
else
if(read_bytes==-1)
{
if(s->on_client_error) s->on_client_error(s,NULL,errno);
break;
}
else
if(s->on_received) s->on_received(s,NULL,read_bytes);
}
close(fd);
return NULL;
}
Так по сути ничего не изменилось ...
Записан
Fregl
Sr. Member
****
Offline Offline

Сообщений: 396


Просмотр профиля
« Ответ #2 : Мая 18, 2010, 06:35:47 pm »

нет, и так не работает Sad.
Записан
ed1k
QOR.Moderator
*****
Offline Offline

Сообщений: 739


Просмотр профиля WWW
« Ответ #3 : Мая 18, 2010, 11:02:18 pm »

К сожалению, из вашего кода не видно как вы создаете сокет, инициализируете его, привязываете (bind) и слушаете очередь сообщений (listen)- а без этого ваш accept малоинформативен. Клиент тоже весьма загадочный - я не заметил, чтобы он чтото передавал серверу.
Записан
Fregl
Sr. Member
****
Offline Offline

Сообщений: 396


Просмотр профиля
« Ответ #4 : Мая 19, 2010, 01:48:18 pm »

Вот серверная часть (инициализация, запуск ...):
Код:
int init_server(server_t * a_server)
{
if((a_server->fd = socket(PF_INET,SOCK_STREAM,0))==-1)
{
printf("socket() error : %s",sys_errlist[errno]);
return errno;
}

struct sockaddr_in sockaddr={0};
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(a_server->port);
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

if(bind(a_server->fd,(const struct sockaddr *)&sockaddr,sizeof(sockaddr))==-1)
{
printf("bind() error : %s",sys_errlist[errno]);
return errno;
}
if(listen(a_server->fd,5)==-1)
{
printf("listen() error : %s",sys_errlist[errno]);
return errno;
}

a_server->server_tid = -1;

return 0;
}

void free_server(server_t * a_server)
{
stop_server(a_server);
if(a_server->fd>=0) close(a_server->fd);
}

int start_server(server_t * a_server)
{
if(a_server->server_tid==-1)
{
pthread_create(&a_server->server_tid,NULL,server_func,a_server);
}
return 0;
}

void stop_server(server_t * a_server)
{
if(a_server->server_tid!=-1)
{
pthread_cancel(a_server->server_tid);
pthread_join(a_server->server_tid,NULL);
a_server->server_tid=-1;
}
}

Клиент может передавать сообщения не часто, по мере изменений в обрабатываемых данных... вот основная часть клиента
Код:
int socket_init(socket_t * a_socket) //инициализация
{
if((a_socket->fd=socket(PF_INET,SOCK_STREAM,0))==-1)
return a_socket->status = SOCKET_INIT_FAIL;
socket_init_params(a_socket);
return a_socket->status = SOCKET_OK;
}

void socket_init_params(socket_t * a_socket)
{
//инициализация таймаутов сокета
struct timeval time_val_send;
struct timeval time_val_recv;
time_val_send.tv_sec = a_socket->write_timeout_msec/1000;
time_val_send.tv_usec = (uint64_t)(a_socket->write_timeout_msec%1000)*1000LL;
time_val_recv.tv_sec = a_socket->read_timeout_msec/1000;
time_val_recv.tv_usec = (uint64_t)(a_socket->read_timeout_msec%1000)*1000LL;
// setsockopt(a_socket->fd,SOL_SOCKET,SO_SNDTIMEO,&time_val_send,sizeof(time_val_send));
// setsockopt(a_socket->fd,SOL_SOCKET,SO_RCVTIMEO,&time_val_recv,sizeof(time_val_recv));
}

int socket_open(socket_t * a_socket) //открытие (соединение)
{
if(!a_socket->host) return SOCKET_NULL_HOST;

struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(a_socket->port);
if(inet_aton(a_socket->host,&addr.sin_addr)==0)
return a_socket->status = SOCKET_WRONG_HOST;

uint64_t connect_timeout = (uint64_t)a_socket->connect_timeout_msec*1000LL;
struct sigevent connect_timeout_event;
SIGEV_UNBLOCK_INIT(&connect_timeout_event);

int socket_error = ETIMEDOUT;
TimerTimeout(CLOCK_MONOTONIC, _NTO_TIMEOUT_SEND | _NTO_TIMEOUT_REPLY, &connect_timeout_event,&connect_timeout,NULL);
socket_error = connect(a_socket->fd,(struct sockaddr*)&addr,sizeof(addr));
if((socket_error==0) || (socket_error==EISCONN)) return SOCKET_CONNECTED;
return a_socket->status = SOCKET_CONNECT_FAIL;
}

void socket_close(socket_t * a_socket)
{
if(a_socket->fd>=0)
{
shutdown(a_socket->fd,SHUT_RD);
close(a_socket->fd);
a_socket->fd = -1;
// a_socket->status = SOCKET_CLOSED;
}
}

int socket_run(socket_t * a_socket)
{
if(a_socket->status != SOCKET_OK)
return SOCKET_NOT_OK;

if(!a_socket->tid)
{
pthread_create(&a_socket->tid,NULL,&socket_t::read_func,(void*)a_socket);
// pthread_join(a_socket->tid,NULL);
return SOCKET_OK;
}
return SOCKET_ALREADY_RUN;
}

ssize_t socket_write(socket_t * a_socket, void * buffer, size_t size)
{
ssize_t sent;
if(a_socket->fd!=-1 && a_socket->status==SOCKET_CONNECTED)
switch(sent = send(a_socket->fd,buffer,size,0))
{
case 0: if(a_socket->on_disconnected) a_socket->on_disconnected(a_socket); break;
case -1: return SOCKET_SEND_FAIL; break;
default: return sent; break;
}
}

void socket_stop(socket_t * a_socket)
{
if(!a_socket->tid)
{
pthread_cancel(a_socket->tid);
pthread_join(a_socket->tid,NULL);
a_socket->tid=-1;
}
}

void socket_free(socket_t * a_socket)
{
socket_stop(a_socket);
socket_close(a_socket);
if(a_socket->recv_buffer)
{
free(a_socket->recv_buffer);
a_socket->recv_buffer = NULL;
}
}

void * socket_t::read_func(void * p)
{
socket_t * a_socket = (socket_t*)p;
ssize_t received = -1;
a_socket->recv_buffer = malloc(a_socket->recv_buffer_size);
uint32_t reconnect_count = a_socket->reconnect_count;
pthread_cleanup_push(&socket_t::on_cancel,a_socket);
while(1)
{
if(a_socket->status==SOCKET_CONNECTED)
{
received = recv(a_socket->fd,a_socket->recv_buffer,a_socket->recv_buffer_size,0);
// printf("received %i\n",received);
if(received>0)
{
if(a_socket->on_received) a_socket->on_received(a_socket,a_socket->recv_buffer,received);
continue;
}
if(received==0 && a_socket->on_disconnected) a_socket->on_disconnected(a_socket);
if(received<0 && a_socket->on_error)
{
printf("sock read error : %s\n",sys_errlist[errno]);
a_socket->on_error(a_socket);
}
a_socket->status = SOCKET_NOT_OK;
}
else
{
socket_close(a_socket); //закрываем сокет
/*
if(a_socket->reconnect_count)
{
if(!reconnect_count) pthread_exit(NULL); //нормальное завершение
else
reconnect_count--;
}
*/
delay(a_socket->reconnect_time); //спим
if((a_socket->status = socket_init(a_socket))!=SOCKET_OK) return (void*)a_socket->status; //ошибка инициализации сокета
if((a_socket->status = socket_open(a_socket))==SOCKET_CONNECTED)
{
if(a_socket->on_connected) a_socket->on_connected(a_socket);
}
else
{
if(a_socket->on_error) a_socket->on_error(a_socket);
}
}
pthread_testcancel();
}
    pthread_cleanup_pop(1);
}
В отдельном потоке по таймеру или изменению отправляются данные на сервер (максимум 2 кб)...
Записан
Fregl
Sr. Member
****
Offline Offline

Сообщений: 396


Просмотр профиля
« Ответ #5 : Мая 19, 2010, 04:11:05 pm »

как всегда решилось элементарно: забыл указать размер приемного буфера (приходил ноль), все заработало!
Записан
qnxloder
Sr. Member
****
Offline Offline

Сообщений: 292


Просмотр профиля
« Ответ #6 : Мая 19, 2010, 04:19:42 pm »

еще надо:
sockaddr.sin_len          = sizeof( sockaddr );

а в клиентксой части struct sockaddr_in sockaddr={0};
Записан
Страниц: [1]
  Печать  
 
Перейти в: