Делюсь своим творчеством, может кому-то поможет. В аттаче исходники для MSVC2008, думаю под линуксом тоже соберется, не тестил, но копипастил из проекта собирающегося в линуксе и по максимуму использовал стандартный Си:
udp_dialog.h - заготовка для асинхронного обмена по UDP с приоритетом чтению входящих пакетов. Это основа на которой работает все ниже перечисленное.
udpeer_server.cpp - простейший сервер для обмена адресами двух клиентов и для замера скорости передачи на сервер.
udpeer.cpp - простейший клиент для p2p обмена. Два клиента обмениваются адресами через сервер и дальше шлют данные друг-другу напрямую.
udp_speed.cpp - замер скорости отправки на сервер
Понимаю что многие считают меня изобретателем велосипеда, когда уже есть libevent, boost::asio, ZMQ и т.д. и т.п. Перечисленные изучал, кое что с их использованием написал. В итоге пришел к выводу что лучше сделать самому с нуля, тем более что задача у меня достаточно простая: дать клиентам возможность в онлайне обмениваться короткими сообщениями и иногда пересылать файлы 10-200 кб, желательно напрямую, а не через сервер. И протокол UDP элементарный, чем удобен и понятен: либо пришел пакет, либо не пришел, но весь пакет целиком, а не какой-то кусок TCP-потока.
Хочу услышать критику по основе всего обмена (в сервере и клиенте). Большого опыта разработки на Си нет, поэтому мог и накосячить по незнанию.
udp_dialog.h
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
// udp_dialog.h
// заготовка для асинхронного обмена по UDP с приоритетом чтению входящих пакетов
// ВАЖНО: для Windows делать #include "udp_dialog.h" перед #include <windows.h>
//-------------------------------------------------------------------------------
// Для запуска цикла приема/обработки/отправки вызывать
int udp_dialog(sockaddr_in* addr = NULL, int buf_count = 1024);
// addr - IP:порт интерфейса который использовать. NULL на выбор ОС
// buf_count - количество буферов приема (минимум можно 2)
// возвращает 0 если завершилась успешно, иначе код ошибки
//-------------------------------------------------------------------------------
// далее в своем коде необходимо прописать две callback-функции:
// 1. обработчик входящих сообщений
int parse_udp_message(char* msg, int size, sockaddr_in& from);
// msg - содержимое сообщения
// size - размер сообщения
// from - адрес отправителя
// возвращает:
// 0 - обработано
// <0 - завершить работу udp_dialog()
// >0 - размер сообщения для отправки, при этом
// msg - содержимое сообщения
// from - адрес получателя
//
// 2. отправка сообщений по собственной инициативе
int send_udp_message(char* msg, sockaddr_in& send_to, int& sleep_msec);
// msg - содержимое сообщения
// send_to - адрес получателя
// sleep_msec - максимальная пауза в миллисекундах до следующего вызова
// учитывается только в случае если нет сообщения на отправку
// возвращает:
// 0 - отправлять нечего
// <0 - завершить работу udp_dialog()
// >0 - размер сообщения для отправки
//-------------------------------------------------------------------------------
// Максимальный размер пакета для приема/отправки
#define UDP_MAX_SIZE 1372
// учитывать при заполнении msg и не выходить за границу msg[UDP_MAX_SIZE]
//-------------------------------------------------------------------------------
// Минимальный размер пакета, меньше которого игнорировать
#define UDP_MIN_SIZE 1
// Буфер сообщения
typedef struct {
sockaddr_in addr; // адрес получателя или отправителя
int size; // размер
char msg[UDP_MAX_SIZE]; // содержимое
} buffer_t;
// переход к следующему буферу
// шаг -1 чтобы обработчики портили следующий (ненужный) буфер выходом за пределы msg[]
// ptr - указатель который меняем
// first - первый, last - последний элемент массива
// stop - элемент который нельзя обгонять
// возвращает true догнали stop
bool next_buffer(buffer_t** ptr, buffer_t* first, buffer_t* last, buffer_t* stop = NULL)
{
if(*ptr == first) {
*ptr = last;
} else {
(*ptr)--;
}
if(*ptr == stop) {
if(*ptr == last) {
*ptr = first;
} else {
(*ptr)++;
}
return true;
}
return false;
}
// Возврашает 0 если нормально или код ошибки
int udp_dialog(sockaddr_in* addr, int buf_count)
{
int err = 0;
// инициализация сокета
#if defined WIN32
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2, 2), &wsaData) != NO_ERROR) {
err = 1;
}
#endif
SOCKET sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (!err && sock == -1) {
err = 2;
}
if(!err) {
if(addr) {
if(bind(sock, (SOCKADDR *) addr, (socklen_t) sizeof(sockaddr_in)) != 0) {
err = 3;
}
} else {
sockaddr_in addr_any;
addr_any.sin_family = AF_INET;
addr_any.sin_port = 0; //htons(0);
addr_any.sin_addr.s_addr = 0; //htonl(INADDR_ANY);
if(bind(sock, (SOCKADDR *) &addr_any, (socklen_t) sizeof(sockaddr_in)) != 0) {
err = 3;
}
}
}
// выделение памяти под буферы приема
buffer_t* buffers = NULL;
if(!err && buf_count > 0) {
buffers = (buffer_t*) calloc(buf_count, sizeof(buffer_t));
if(!buffers) {
err = 4;
}
}
// прием и обработка сообщений
if(!err) {
buffer_t* buf_end = buffers + buf_count - 1; // последний буфер
buffer_t* buf_read = buf_end; // буфер для приема
buffer_t* buf_parse = buffers; // буфер для обработки
bool queue_empty = true;
timeval wait_time;
wait_time.tv_sec = 0;
wait_time.tv_usec = 0;
// Ожидание сообщений
while(true) {
// проверка наличия входящего сообщания
fd_set fd;
FD_ZERO(&fd);
FD_SET(sock, &fd);
int sel = select(sock + 1, &fd, NULL, NULL, &wait_time);
wait_time.tv_usec = 1; // если поставить 0 не успевают отправляться
if(sel == -1) {// SOCKET_ERROR
err = 5;
break;
} else if(sel == 0 || buf_read == buf_parse) { // принимать нечего или некуда
int size_to_send = 0;
if(buf_parse->size != 0) {
// обработка сообщения
size_to_send = parse_udp_message(buf_parse->msg, buf_parse->size, buf_parse->addr);
buf_parse->size = 0; // пометка буфера пустым
} else {
// отправка по собственной инициативе
int sleep_msec = 100; // пауза 0.1 сек если отправитель не изменит
size_to_send = send_udp_message(buf_parse->msg, buf_parse->addr, sleep_msec);
if(size_to_send == 0 && sleep_msec > 0) { // установка паузы если слать нечего
wait_time.tv_usec = sleep_msec < 1000 ? sleep_msec * 1000 : 999999;
}
}
if(size_to_send >= UDP_MIN_SIZE) {
// отправка UDP пакета
if(sendto(sock, buf_parse->msg, size_to_send, 0, (SOCKADDR *) &buf_parse->addr, sizeof(sockaddr_in)) <= 0) {
printf("sendto() error\n");
}
//Sleep(10);
} else if(size_to_send < 0) {
break; // выходим из цикла обработки сообщений
}
queue_empty = next_buffer(&buf_parse, buffers, buf_end, buf_read); // переходим к следующему буферу
} else {
// прием UDP пакета
socklen_t addr_size = sizeof(sockaddr_in);
int size = recvfrom(sock, buf_read->msg, UDP_MAX_SIZE, 0, (SOCKADDR *) &buf_read->addr, &addr_size);
if(size >= UDP_MIN_SIZE) {
// прочитано успешно
buf_read->size = size;
if(queue_empty) {
buf_parse = buf_read;
}
next_buffer(&buf_read, buffers, buf_end); // переходим к следующему буферу
}
}
}
}
// выход
if(buffers) {
free(buffers);
}
if(sock != -1) {
#if defined WIN32
closesocket(sock);
#elif defined __linux__
close(sock);
#endif
}
#if defined WIN32
WSACleanup();
#endif
return err;
}
Принцип работы вкратце следующий: чтобы не потерять ничего входящего, создается массив буферов, куда все входящие пакеты сохраняются. Когда читать нечего запускается парсер принятого, когда парсить нечего запускается опрос того что надо отправить.
На мегауниверсальность не претендую, все родилось из моей задачи, где надо:
1. ответить коротким сообщением на короткое сообщение
2. послать файл
Соответственно в первом случае при разборе сообщения (parse_udp_message()) в тот же буфер который парсится пишется ответ и возвращается размер, при необходимости можно сменить получателя, по дефолту уйдет тому кто прислал.
Во втором случае (send_udp_message()) дается буфер, который наполняется данными и задается получатель.
Получилась схема изолирующая транспорт от данных. Вся работа с сокетом собрана в udp_dialog.h, а в коде его использующем только обработка и подготовка данных. Пример:
udp_speed.cpp
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
//------------------------------------------
// Замер скорости отправки на сервер
//------------------------------------------
// Принцип работы:
// 1. Посылается серверу запрос готовности к отправке (MSG_READY_SEND)
// 2. Сервер дает команду начала отправки (MSG_START_DATA)
// 3. отправка test_window_size пакетов размера test_packet_size (MSG_DATA)
// 4. Сервер сообщает количество принятых пакетов (MSG_RECV_ALL)
// сервер сообщает время
// time_recv_init между MSG_START_DATA и первым MSG_DATA
// time_recv_all между MSG_START_DATA и последним MSG_DATA
//
// Отправитель засекает время:
// time_send_init между MSG_READY_SEND и MSG_START_DATA
// time_send_all между MSG_READY_SEND и MSG_RECV_ALL
//
// Для упрощения подсчетов все пакеты одинакового размера (test_packet_size)
//------------------------------------------
// Отладочные printf() закомментированы, для разбора работы можно включить
//------------------------------------------
// типы сообщений (1-й байт пакета)
#define MSG_READY_SEND 6 // сообщение о готовности к отправке
#define MSG_START_DATA 7 // команда на начало отправки данных
#define MSG_DATA 8 // пакет данных
#define MSG_RECV_ALL 10 // данные все приняты
// состояние работы
#define ST_START 0
#define ST_WAIT 1
#define ST_SEND 2
#define ST_WAIT_RES 3
// Количество и размер пакетов одной тестовой отправки
int test_window_size = 100;
int test_packet_size = 1024;
// общие переменные
sockaddr_in addr_server; // адрес сервера
int state = ST_START; // состояние
int time_send_init, time_send_all; // таймеры
int send_count;
// callback-функция периодически вызываемая для отправки сообщения
int send_udp_message(char* msg, sockaddr_in& send_to, int& sleep_msec)
{
switch(state) {
case ST_START: // начало работы
send_to = addr_server;
printf("%5d: start test speed server %s send %d packets of %d bytes\n", clock(), inet_ntoa(send_to.sin_addr), test_window_size, test_packet_size);
state = ST_WAIT;
*msg = MSG_READY_SEND;
time_send_init = clock();
time_send_all = clock();
return test_packet_size; // отправка MSG_READY_SEND
case ST_WAIT: // ожидание ответа сервера
state = ST_START;
sleep_msec = 1000;
return 0;
case ST_SEND: // отправка данных
if(send_count > 0) {
send_count--;
send_to = addr_server;
memcpy(msg + 1, &send_count, 4); // номер пакета
for(int i = 5; i < test_packet_size; i++) { // как бы подготовка данных
msg[i] = (i & 127);
}
//printf("%5d: Send DATA #%d\n", clock(), send_count);
*msg = MSG_DATA;
return test_packet_size; // send MSG_DATA
}
state = ST_WAIT_RES;
return 0;
case ST_WAIT_RES: // ожидание результатов
state = ST_SEND; // отправить
send_count = 1; // последний пакет
sleep_msec = 1000; // если ответа не будет 1 сек.
return 0;
}
return 0;
}
// callback-функция вызываемая для разбора принятого сообщения
int parse_udp_message(char* msg, int size, sockaddr_in& from)
{
if(size != test_packet_size) {
// неверный размер
printf("%5d: Wrong msg #%d size %d from %s:%d\n", clock(), *msg, size, inet_ntoa(from.sin_addr), ntohs(from.sin_port));
return 0;
}
switch(*msg) {
case MSG_START_DATA: // команда на отправку данных
//printf("%5d: MSG_START_DATA\n", clock());
send_count = test_window_size; // количество пакетов для отправки
time_send_init = clock() - time_send_init; // остановка time_send_init
state = ST_SEND;
return 0;
case MSG_RECV_ALL: // результат от сервера
time_send_all = clock() - time_send_all;
{
int time_recv_init, time_recv_all, data_count;
memcpy(&data_count, msg + 1, 4);
memcpy(&time_recv_init, msg + 5, 4);
memcpy(&time_recv_all, msg + 9, 4);
printf("\n%5d: Speed test result: delivered %d msg of %d (lost %d)\n", clock(), data_count, test_window_size, test_window_size - data_count);
printf(" time_send_init = %d ms\n", time_send_init);
printf(" time_recv_init = %d ms\n", time_recv_init);
printf(" time_recv_all = %d ms\n", time_recv_all);
printf(" time_send_all = %d ms\n", time_send_all);
}
return -1;
default:
printf("%5d: Unknown msg #%d size %d from %s:%d\n", clock(), *msg, size, inet_ntoa(from.sin_addr), ntohs(from.sin_port));
}
return 0;
}
int main(int argc, char* argv[])
{
addr_server.sin_family = AF_INET;
addr_server.sin_port = htons(45678);
if(argc > 1) {
addr_server.sin_addr.s_addr = inet_addr(argv[1]);
} else {
printf("\ncommand line:\nudp_speed.exe [server_ip]\n\n");
addr_server.sin_addr.s_addr = inet_addr("127.0.0.1");
}
int err = udp_dialog();
if(err) {
printf("udp_dialog() error %d\n", err);
}
system("pause");
return 0;
}
Не три строчки, но в асинхронных приложениях всегда мозг взрывается.
Нерешенные вопросы:
1. Какой взять максимальный размер пакета?
Максимальный размер 65507 байт, больше не отправится, по некоторым инет-каналам доходит нормально, но не по всем. Из тестов размера (просто слал разными каналами разного размера пакеты и смотрел сколько теряется) пришел к размеру 1372 байта - это максимум для L2TP с MTU 1400. Делаешь 1373 и больше половины пакетов теряется.
Другие тесты показывают что уменьшение размера пакета никакой пользы не дает, там где проходит 90 из 100 пакетов по 1024 байта, с той же скоростью так же проходит 90 из 100 по 10 байт. Т.е. важен предел размера, а не размер как таковой.
2. Каким алгоритмом контроля потерь слать?
Как выше написал, наблюдаю прямую зависимость потерь от скорости отправки и независимую от размера (пакеты по 10 и 1000 байт теряются одинаково). По предварительным тестам получается что слать надо с частотой половина времени отклика получателя (послать одиночное сообщение получателю, получить ответ). Т.е. пока планируемая стратегия отправки такова: замер времени по запросу-ответу, а дальше один пакет в половину этого времени, получатель периодически сообщает о потерях, на основании чего скорость уменьшается или увеличивается (если потерь нет). Как показала практика отклик одного и того же узла меняется на порядок от 2 мс до 28 мс. Это полноценный проводной инет, что будет с GSM-модемами еще не тестил.
Классическая стратегия окон тоже не подходит, т.к. при быстрой отправке даже 10 пакетов половина может потеряться.
PS Много букав получилось :)