A server that can be used to demonstrate direct (no broker) peer-to-peer communication It can accept an incoming connection from either the send.c or receive.c examples and will act as the directly-connected counterpart (receive or send)
#include <proton/object.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
int message_count;
int sent;
int acknowledged;
int received;
} app_data_t;
static const int BATCH = 1000;
static int exit_code = 0;
}
exit_code = 1;
}
}
static void send_message(app_data_t *app,
pn_link_t *sender) {
exit_code = 1;
}
}
if (!err) {
printf("%s\n", s);
fflush(stdout);
free(s);
free(data.start);
} else {
exit_code = 1;
}
}
static void handle_receive(app_data_t *app,
pn_event_t* event) {
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
} break;
ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
printf("Message aborted\n");
fflush(stdout);
m->size = 0;
}
else if (recv < 0 && recv !=
PN_EOS) {
decode_message(*m);
*m = pn_rwbytes_null;
if (app->message_count == 0) {
}
} else if (++app->received >= app->message_count) {
printf("%d messages received\n", app->received);
}
}
}
break;
}
default:
break;
}
}
static void handle_send(app_data_t* app,
pn_event_t* event) {
} break;
while (
pn_link_credit(sender) > 0 && app->sent < app->message_count) {
++app->sent;
send_message(app, sender);
}
break;
}
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
}
}
} break;
default:
break;
}
}
static bool handle(app_data_t* app,
pn_event_t* event) {
char port[256];
printf("listening on %s\n", port);
fflush(stdout);
break;
}
break;
break;
break;
}
break;
}
break;
}
break;
break;
break;
break;
break;
app->listener = NULL;
break;
return false;
break;
default: {
if (l) {
handle_send(app, event);
} else {
handle_receive(app, event);
}
}
}
}
return exit_code == 0;
}
void run(app_data_t *app) {
do {
if (!handle(app, e)) {
return;
}
}
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
app.container_id = argv[0];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
run(&app);
free(app.msgout.start);
free(app.msgin.start);
return exit_code;
}