source: trunk/third/evolution/e-util/e-msgport.c @ 17553

Revision 17553, 18.0 KB checked in by rbasch, 22 years ago (diff)
Fix to previous revision: In the select() retry loop in e_msgport_wait(), protect against errno getting reset after the select(). Also, simplify by leaving mp->lock unlocked around the entire loop.
Line 
1
2#include "e-msgport.h"
3
4#include <sys/time.h>
5#include <sys/types.h>
6#include <unistd.h>
7#include <errno.h>
8#include <string.h>
9#include <stdio.h>
10
11#include <pthread.h>
12
13#include <glib.h>
14
15#define m(x)                    /* msgport debug */
16#define t(x)                    /* thread debug */
17
18void e_dlist_init(EDList *v)
19{
20        v->head = (EDListNode *)&v->tail;
21        v->tail = 0;
22        v->tailpred = (EDListNode *)&v->head;
23}
24
25EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
26{
27        n->next = l->head;
28        n->prev = (EDListNode *)&l->head;
29        l->head->prev = n;
30        l->head = n;
31        return n;
32}
33
34EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
35{
36        n->next = (EDListNode *)&l->tail;
37        n->prev = l->tailpred;
38        l->tailpred->next = n;
39        l->tailpred = n;
40        return n;
41}
42
43EDListNode *e_dlist_remove(EDListNode *n)
44{
45        n->next->prev = n->prev;
46        n->prev->next = n->next;
47        return n;
48}
49
50EDListNode *e_dlist_remhead(EDList *l)
51{
52        EDListNode *n, *nn;
53
54        n = l->head;
55        nn = n->next;
56        if (nn) {
57                nn->prev = n->prev;
58                l->head = nn;
59                return n;
60        }
61        return NULL;
62}
63
64EDListNode *e_dlist_remtail(EDList *l)
65{
66        EDListNode *n, *np;
67
68        n = l->tailpred;
69        np = n->prev;
70        if (np) {
71                np->next = n->next;
72                l->tailpred = np;
73                return n;
74        }
75        return NULL;
76}
77
78int e_dlist_empty(EDList *l)
79{
80        return (l->head == (EDListNode *)&l->tail);
81}
82
83int e_dlist_length(EDList *l)
84{
85        EDListNode *n, *nn;
86        int count = 0;
87
88        n = l->head;
89        nn = n->next;
90        while (nn) {
91                count++;
92                n = nn;
93                nn = n->next;
94        }
95
96        return 0;
97}
98
99struct _EMsgPort {
100        EDList queue;
101        int condwait;           /* how many waiting in condwait */
102        union {
103                int pipe[2];
104                struct {
105                        int read;
106                        int write;
107                } fd;
108        } pipe;
109        /* @#@$#$ glib stuff */
110        GCond *cond;
111        GMutex *lock;
112};
113
114EMsgPort *e_msgport_new(void)
115{
116        EMsgPort *mp;
117
118        mp = g_malloc(sizeof(*mp));
119        e_dlist_init(&mp->queue);
120        mp->lock = g_mutex_new();
121        mp->cond = g_cond_new();
122        mp->pipe.fd.read = -1;
123        mp->pipe.fd.write = -1;
124        mp->condwait = 0;
125
126        return mp;
127}
128
129void e_msgport_destroy(EMsgPort *mp)
130{
131        g_mutex_free(mp->lock);
132        g_cond_free(mp->cond);
133        if (mp->pipe.fd.read != -1) {
134                close(mp->pipe.fd.read);
135                close(mp->pipe.fd.write);
136        }
137        g_free(mp);
138}
139
140/* get a fd that can be used to wait on the port asynchronously */
141int e_msgport_fd(EMsgPort *mp)
142{
143        int fd;
144
145        g_mutex_lock(mp->lock);
146        fd = mp->pipe.fd.read;
147        if (fd == -1) {
148                pipe(mp->pipe.pipe);
149                fd = mp->pipe.fd.read;
150        }
151        g_mutex_unlock(mp->lock);
152
153        return fd;
154}
155
156void e_msgport_put(EMsgPort *mp, EMsg *msg)
157{
158        int fd;
159
160        m(printf("put:\n"));
161        g_mutex_lock(mp->lock);
162        e_dlist_addtail(&mp->queue, &msg->ln);
163        if (mp->condwait > 0) {
164                m(printf("put: condwait > 0, waking up\n"));
165                g_cond_signal(mp->cond);
166        }
167        fd = mp->pipe.fd.write;
168        g_mutex_unlock(mp->lock);
169
170        if (fd != -1) {
171                m(printf("put: have pipe, writing notification to it\n"));
172                write(fd, "", 1);
173        }
174
175        m(printf("put: done\n"));
176}
177
178static void
179msgport_cleanlock(void *data)
180{
181        EMsgPort *mp = data;
182
183        g_mutex_unlock(mp->lock);
184}
185
186EMsg *e_msgport_wait(EMsgPort *mp)
187{
188        EMsg *msg;
189
190        m(printf("wait:\n"));
191        g_mutex_lock(mp->lock);
192        while (e_dlist_empty(&mp->queue)) {
193                if (mp->pipe.fd.read == -1) {
194                        m(printf("wait: waiting on condition\n"));
195                        mp->condwait++;
196                        /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
197                        pthread_cleanup_push(msgport_cleanlock, mp);
198                        g_cond_wait(mp->cond, mp->lock);
199                        pthread_cleanup_pop(0);
200                        m(printf("wait: got condition\n"));
201                        mp->condwait--;
202                } else {
203                        fd_set rfds;
204                        int retry;
205
206                        m(printf("wait: waitng on pipe\n"));
207                        g_mutex_unlock(mp->lock);
208                        do {
209                                FD_ZERO(&rfds);
210                                FD_SET(mp->pipe.fd.read, &rfds);
211                                retry = (select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL) == -1 && errno == EINTR);
212                                pthread_testcancel();
213                        } while (retry);
214                        g_mutex_lock(mp->lock);
215                        m(printf("wait: got pipe\n"));
216                }
217        }
218        msg = (EMsg *)mp->queue.head;
219        m(printf("wait: message = %p\n", msg));
220        g_mutex_unlock(mp->lock);
221        m(printf("wait: done\n"));
222        return msg;
223}
224
225EMsg *e_msgport_get(EMsgPort *mp)
226{
227        EMsg *msg;
228        char dummy[1];
229
230        g_mutex_lock(mp->lock);
231        msg = (EMsg *)e_dlist_remhead(&mp->queue);
232        if (msg && mp->pipe.fd.read != -1)
233                read(mp->pipe.fd.read, dummy, 1);
234        m(printf("get: message = %p\n", msg));
235        g_mutex_unlock(mp->lock);
236
237        return msg;
238}
239
240void e_msgport_reply(EMsg *msg)
241{
242        if (msg->reply_port) {
243                e_msgport_put(msg->reply_port, msg);
244        }
245        /* else lost? */
246}
247
248struct _thread_info {
249        pthread_t id;
250        int busy;
251};
252
253struct _EThread {
254        EMsgPort *server_port;
255        EMsgPort *reply_port;
256        pthread_mutex_t mutex;
257        e_thread_t type;
258        int queue_limit;
259
260        int waiting;            /* if we are waiting for a new message, count of waiting processes */
261        pthread_t id;           /* id of our running child thread */
262        GList *id_list;         /* if THREAD_NEW, then a list of our child threads in thread_info structs */
263
264        EThreadFunc destroy;
265        void *destroy_data;
266
267        EThreadFunc received;
268        void *received_data;
269
270        EThreadFunc lost;
271        void *lost_data;
272};
273
274#define E_THREAD_NONE ((pthread_t)~0)
275#define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
276
277static void thread_destroy_msg(EThread *e, EMsg *m);
278
279static struct _thread_info *thread_find(EThread *e, pthread_t id)
280{
281        GList *node;
282        struct _thread_info *info;
283
284        node = e->id_list;
285        while (node) {
286                info = node->data;
287                if (info->id == id)
288                        return info;
289                node = node->next;
290        }
291        return NULL;
292}
293
294#if 0
295static void thread_remove(EThread *e, pthread_t id)
296{
297        GList *node;
298        struct _thread_info *info;
299
300        node = e->id_list;
301        while (node) {
302                info = node->data;
303                if (info->id == id) {
304                        e->id_list = g_list_remove(e->id_list, info);
305                        g_free(info);
306                }
307                node = node->next;
308        }
309}
310#endif
311
312EThread *e_thread_new(e_thread_t type)
313{
314        EThread *e;
315
316        e = g_malloc0(sizeof(*e));
317        pthread_mutex_init(&e->mutex, 0);
318        e->type = type;
319        e->server_port = e_msgport_new();
320        e->id = E_THREAD_NONE;
321        e->queue_limit = INT_MAX;
322
323        return e;
324}
325
326/* close down the threads & resources etc */
327void e_thread_destroy(EThread *e)
328{
329        int busy = FALSE;
330        EMsg *msg;
331        struct _thread_info *info;
332        GList *l;
333
334        /* make sure we soak up all the messages first */
335        while ( (msg = e_msgport_get(e->server_port)) ) {
336                thread_destroy_msg(e, msg);
337        }
338
339        pthread_mutex_lock(&e->mutex);
340
341        switch(e->type) {
342        case E_THREAD_QUEUE:
343        case E_THREAD_DROP:
344                /* if we have a thread, 'kill' it */
345                if (e->id != E_THREAD_NONE) {
346                        pthread_t id = e->id;
347
348                        t(printf("Sending thread '%d' quit message\n", id));
349
350                        e->id = E_THREAD_NONE;
351
352                        msg = g_malloc0(sizeof(*msg));
353                        msg->reply_port = E_THREAD_QUIT_REPLYPORT;
354                        e_msgport_put(e->server_port, msg);
355
356                        pthread_mutex_unlock(&e->mutex);
357                        t(printf("Joining thread '%d'\n", id));
358                        pthread_join(id, 0);
359                        t(printf("Joined thread '%d'!\n", id));
360                        pthread_mutex_lock(&e->mutex);
361                }
362                busy = e->id != E_THREAD_NONE;
363                break;
364        case E_THREAD_NEW:
365                /* first, send everyone a quit message */
366                l = e->id_list;
367                while (l) {
368                        info = l->data;
369                        t(printf("Sending thread '%d' quit message\n", info->id));
370                        msg = g_malloc0(sizeof(*msg));
371                        msg->reply_port = E_THREAD_QUIT_REPLYPORT;
372                        e_msgport_put(e->server_port, msg);
373                        l = l->next;                   
374                }
375
376                /* then, wait for everyone to quit */
377                while (e->id_list) {
378                        info = e->id_list->data;
379                        e->id_list = g_list_remove(e->id_list, info);
380                        pthread_mutex_unlock(&e->mutex);
381                        t(printf("Joining thread '%d'\n", info->id));
382                        pthread_join(info->id, 0);
383                        t(printf("Joined thread '%d'!\n", info->id));
384                        pthread_mutex_lock(&e->mutex);
385                        g_free(info);
386                }
387                busy = g_list_length(e->id_list) != 0;
388                break;
389        }
390
391        pthread_mutex_unlock(&e->mutex);
392
393        /* and clean up, if we can */
394        if (busy) {
395                g_warning("threads were busy, leaked EThread");
396                return;
397        }
398
399        e_msgport_destroy(e->server_port);
400        g_free(e);
401}
402
403/* set the queue maximum depth, what happens when the queue
404   fills up depends on the queue type */
405void e_thread_set_queue_limit(EThread *e, int limit)
406{
407        e->queue_limit = limit;
408}
409
410/* set a msg destroy callback, this can not call any e_thread functions on @e */
411void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
412{
413        pthread_mutex_lock(&e->mutex);
414        e->destroy = destroy;
415        e->destroy_data = data;
416        pthread_mutex_unlock(&e->mutex);
417}
418
419/* set a message lost callback, called if any message is discarded */
420void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
421{
422        pthread_mutex_lock(&e->mutex);
423        e->lost = lost;
424        e->lost_data = lost;
425        pthread_mutex_unlock(&e->mutex);
426}
427
428/* set a reply port, if set, then send messages back once finished */
429void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
430{
431        e->reply_port = reply_port;
432}
433
434/* set a received data callback */
435void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
436{
437        pthread_mutex_lock(&e->mutex);
438        e->received = received;
439        e->received_data = data;
440        pthread_mutex_unlock(&e->mutex);
441}
442
443static void
444thread_destroy_msg(EThread *e, EMsg *m)
445{
446        EThreadFunc func;
447        void *func_data;
448
449        /* we do this so we never get an incomplete/unmatched callback + data */
450        pthread_mutex_lock(&e->mutex);
451        func = e->destroy;
452        func_data = e->destroy_data;
453        pthread_mutex_unlock(&e->mutex);
454       
455        if (func)
456                func(e, m, func_data);
457}
458
459static void
460thread_received_msg(EThread *e, EMsg *m)
461{
462        EThreadFunc func;
463        void *func_data;
464
465        /* we do this so we never get an incomplete/unmatched callback + data */
466        pthread_mutex_lock(&e->mutex);
467        func = e->received;
468        func_data = e->received_data;
469        pthread_mutex_unlock(&e->mutex);
470       
471        if (func)
472                func(e, m, func_data);
473        else
474                g_warning("No processing callback for EThread, message unprocessed");
475}
476
477static void
478thread_lost_msg(EThread *e, EMsg *m)
479{
480        EThreadFunc func;
481        void *func_data;
482
483        /* we do this so we never get an incomplete/unmatched callback + data */
484        pthread_mutex_lock(&e->mutex);
485        func = e->lost;
486        func_data = e->lost_data;
487        pthread_mutex_unlock(&e->mutex);
488       
489        if (func)
490                func(e, m, func_data);
491}
492
493/* the actual thread dispatcher */
494static void *
495thread_dispatch(void *din)
496{
497        EThread *e = din;
498        EMsg *m;
499        struct _thread_info *info;
500        pthread_t self = pthread_self();
501
502        t(printf("dispatch thread started: %ld\n", pthread_self()));
503
504        while (1) {
505                pthread_mutex_lock(&e->mutex);
506                m = e_msgport_get(e->server_port);
507                if (m == NULL) {
508                        /* nothing to do?  If we are a 'new' type thread, just quit.
509                           Otherwise, go into waiting (can be cancelled here) */
510                        info = NULL;
511                        switch (e->type) {
512                        case E_THREAD_NEW:
513                        case E_THREAD_QUEUE:
514                        case E_THREAD_DROP:
515                                info = thread_find(e, self);
516                                if (info)
517                                        info->busy = FALSE;
518                                e->waiting++;
519                                pthread_mutex_unlock(&e->mutex);
520                                e_msgport_wait(e->server_port);
521                                pthread_mutex_lock(&e->mutex);
522                                e->waiting--;
523                                pthread_mutex_unlock(&e->mutex);
524                                break;
525#if 0
526                        case E_THREAD_NEW:
527                                e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
528                                pthread_mutex_unlock(&e->mutex);
529                                return 0;
530#endif
531                        }
532
533                        continue;
534                } else if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
535                        t(printf("Thread %d got quit message\n", self));
536                        /* Handle a quit message, say we're quitting, free the message, and break out of the loop */
537                        info = thread_find(e, self);
538                        if (info)
539                                info->busy = 2;
540                        pthread_mutex_unlock(&e->mutex);
541                        g_free(m);
542                        break;
543                } else {
544                        info = thread_find(e, self);
545                        if (info)
546                                info->busy = TRUE;
547                }
548                pthread_mutex_unlock(&e->mutex);
549
550                t(printf("got message in dispatch thread\n"));
551
552                /* process it */
553                thread_received_msg(e, m);
554
555                /* if we have a reply port, send it back, otherwise, lose it */
556                if (m->reply_port) {
557                        e_msgport_reply(m);
558                } else {
559                        thread_destroy_msg(e, m);
560                }
561        }
562
563        return NULL;
564}
565
566/* send a message to the thread, start thread if necessary */
567void e_thread_put(EThread *e, EMsg *msg)
568{
569        pthread_t id;
570        EMsg *dmsg = NULL;
571
572        pthread_mutex_lock(&e->mutex);
573
574        /* the caller forgot to tell us what to do, well, we can't do anything can we */
575        if (e->received == NULL) {
576                pthread_mutex_unlock(&e->mutex);
577                g_warning("EThread called with no receiver function, no work to do!");
578                thread_destroy_msg(e, msg);
579                return;
580        }
581
582        msg->reply_port = e->reply_port;
583
584        switch(e->type) {
585        case E_THREAD_QUEUE:
586                /* if the queue is full, lose this new addition */
587                if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
588                        e_msgport_put(e->server_port, msg);
589                } else {
590                        printf("queue limit reached, dropping new message\n");
591                        dmsg = msg;
592                }
593                break;
594        case E_THREAD_DROP:
595                /* if the queue is full, lose the oldest (unprocessed) message */
596                if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
597                        e_msgport_put(e->server_port, msg);
598                } else {
599                        printf("queue limit reached, dropping old message\n");
600                        e_msgport_put(e->server_port, msg);
601                        dmsg = e_msgport_get(e->server_port);
602                }
603                break;
604        case E_THREAD_NEW:
605                /* it is possible that an existing thread can catch this message, so
606                   we might create a thread with no work to do.
607                   but that doesn't matter, the other alternative that it be lost is worse */
608                e_msgport_put(e->server_port, msg);
609                if (e->waiting == 0
610                    && g_list_length(e->id_list) < e->queue_limit
611                    && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
612                        struct _thread_info *info = g_malloc0(sizeof(*info));
613                        t(printf("created NEW thread %ld\n", id));
614                        info->id = id;
615                        info->busy = TRUE;
616                        e->id_list = g_list_append(e->id_list, info);
617                }
618                pthread_mutex_unlock(&e->mutex);
619                return;
620        }
621
622        /* create the thread, if there is none to receive it yet */
623        if (e->id == E_THREAD_NONE) {
624                if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
625                        g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
626                        e->id = E_THREAD_NONE;
627                }
628        }
629
630        pthread_mutex_unlock(&e->mutex);
631
632        if (dmsg) {
633                thread_lost_msg(e, dmsg);
634                thread_destroy_msg(e, dmsg);
635        }
636}
637
638/* yet-another-mutex interface */
639struct _EMutex {
640        int type;
641        pthread_t owner;
642        short waiters;
643        short depth;
644        pthread_mutex_t mutex;
645        pthread_cond_t cond;
646};
647
648/* sigh, this is just painful to have to need, but recursive
649   read/write, etc mutexes just aren't very common in thread
650   implementations */
651/* TODO: Just make it use recursive mutexes if they are available */
652EMutex *e_mutex_new(e_mutex_t type)
653{
654        struct _EMutex *m;
655
656        m = g_malloc(sizeof(*m));
657        m->type = type;
658        m->waiters = 0;
659        m->depth = 0;
660        m->owner = E_THREAD_NONE;
661
662        switch (type) {
663        case E_MUTEX_SIMPLE:
664                pthread_mutex_init(&m->mutex, 0);
665                break;
666        case E_MUTEX_REC:
667                pthread_mutex_init(&m->mutex, 0);
668                pthread_cond_init(&m->cond, 0);
669                break;
670                /* read / write ?  flags for same? */
671        }
672
673        return m;
674}
675
676int e_mutex_destroy(EMutex *m)
677{
678        int ret = 0;
679
680        switch (m->type) {
681        case E_MUTEX_SIMPLE:
682                ret = pthread_mutex_destroy(&m->mutex);
683                if (ret == -1)
684                        g_warning("EMutex destroy failed: %s", strerror(errno));
685                g_free(m);
686                break;
687        case E_MUTEX_REC:
688                ret = pthread_mutex_destroy(&m->mutex);
689                if (ret == -1)
690                        g_warning("EMutex destroy failed: %s", strerror(errno));
691                ret = pthread_cond_destroy(&m->cond);
692                if (ret == -1)
693                        g_warning("EMutex destroy failed: %s", strerror(errno));
694                g_free(m);
695
696        }
697        return ret;
698}
699
700int e_mutex_lock(EMutex *m)
701{
702        pthread_t id;
703
704        switch (m->type) {
705        case E_MUTEX_SIMPLE:
706                return pthread_mutex_lock(&m->mutex);
707        case E_MUTEX_REC:
708                id = pthread_self();
709                if (pthread_mutex_lock(&m->mutex) == -1)
710                        return -1;
711                while (1) {
712                        if (m->owner == E_THREAD_NONE) {
713                                m->owner = id;
714                                m->depth = 1;
715                                break;
716                        } else if (id == m->owner) {
717                                m->depth++;
718                                break;
719                        } else {
720                                m->waiters++;
721                                if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
722                                        return -1;
723                                m->waiters--;
724                        }
725                }
726                return pthread_mutex_unlock(&m->mutex);
727        }
728
729        errno = EINVAL;
730        return -1;
731}
732
733int e_mutex_unlock(EMutex *m)
734{
735        switch (m->type) {
736        case E_MUTEX_SIMPLE:
737                return pthread_mutex_unlock(&m->mutex);
738        case E_MUTEX_REC:
739                if (pthread_mutex_lock(&m->mutex) == -1)
740                        return -1;
741                g_assert(m->owner == pthread_self());
742
743                m->depth--;
744                if (m->depth == 0) {
745                        m->owner = E_THREAD_NONE;
746                        if (m->waiters > 0)
747                                pthread_cond_signal(&m->cond);
748                }
749                return pthread_mutex_unlock(&m->mutex);
750        }
751
752        errno = EINVAL;
753        return -1;
754}
755
756void e_mutex_assert_locked(EMutex *m)
757{
758        g_return_if_fail (m->type == E_MUTEX_REC);
759        pthread_mutex_lock(&m->mutex);
760        g_assert(m->owner == pthread_self());
761        pthread_mutex_unlock(&m->mutex);
762}
763
764#ifdef STANDALONE
765EMsgPort *server_port;
766
767
768void *fdserver(void *data)
769{
770        int fd;
771        EMsg *msg;
772        int id = (int)data;
773        fd_set rfds;
774
775        fd = e_msgport_fd(server_port);
776
777        while (1) {
778                int count = 0;
779
780                printf("server %d: waiting on fd %d\n", id, fd);
781                FD_ZERO(&rfds);
782                FD_SET(fd, &rfds);
783                select(fd+1, &rfds, NULL, NULL, NULL);
784                printf("server %d: Got async notification, checking for messages\n", id);
785                while ((msg = e_msgport_get(server_port))) {
786                        printf("server %d: got message\n", id);
787                        sleep(1);
788                        printf("server %d: replying\n", id);
789                        e_msgport_reply(msg);
790                        count++;
791                }
792                printf("server %d: got %d messages\n", id, count);
793        }
794}
795
796void *server(void *data)
797{
798        EMsg *msg;
799        int id = (int)data;
800
801        while (1) {
802                printf("server %d: waiting\n", id);
803                msg = e_msgport_wait(server_port);
804                msg = e_msgport_get(server_port);
805                if (msg) {
806                        printf("server %d: got message\n", id);
807                        sleep(1);
808                        printf("server %d: replying\n", id);
809                        e_msgport_reply(msg);
810                } else {
811                        printf("server %d: didn't get message\n", id);
812                }
813        }
814}
815
816void *client(void *data)
817{
818        EMsg *msg;
819        EMsgPort *replyport;
820        int i;
821
822        replyport = e_msgport_new();
823        msg = g_malloc0(sizeof(*msg));
824        msg->reply_port = replyport;
825        for (i=0;i<10;i++) {
826                /* synchronous operation */
827                printf("client: sending\n");
828                e_msgport_put(server_port, msg);
829                printf("client: waiting for reply\n");
830                e_msgport_wait(replyport);
831                e_msgport_get(replyport);
832                printf("client: got reply\n");
833        }
834
835        printf("client: sleeping ...\n");
836        sleep(2);
837        printf("client: sending multiple\n");
838
839        for (i=0;i<10;i++) {
840                msg = g_malloc0(sizeof(*msg));
841                msg->reply_port = replyport;
842                e_msgport_put(server_port, msg);
843        }
844
845        printf("client: receiving multiple\n");
846        for (i=0;i<10;i++) {
847                e_msgport_wait(replyport);
848                msg = e_msgport_get(replyport);
849                g_free(msg);
850        }
851
852        printf("client: done\n");
853}
854
855int main(int argc, char **argv)
856{
857        pthread_t serverid, clientid;
858
859        g_thread_init(NULL);
860
861        server_port = e_msgport_new();
862
863        /*pthread_create(&serverid, NULL, server, (void *)1);*/
864        pthread_create(&serverid, NULL, fdserver, (void *)1);
865        pthread_create(&clientid, NULL, client, NULL);
866
867        sleep(60);
868
869        return 0;
870}
871#endif
Note: See TracBrowser for help on using the repository browser.