source: trunk/third/evolution/e-util/e-msgport.c @ 18147

Revision 18147, 21.4 KB checked in by ghudson, 22 years ago (diff)
Merge with evolution 1.2.1.
Line 
1/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2/*
3 * Authors: Michael Zucchi <notzed@ximian.com>
4 *
5 * Copyright 2002 Ximian, Inc. (www.ximian.com)
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of version 2 of the GNU General Public
9 * License as published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
20 *
21 */
22
23#ifdef HAVE_CONFIG_H
24#include <config.h>
25#endif
26
27#include <sys/time.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <errno.h>
31#include <string.h>
32#include <stdio.h>
33
34#include <pthread.h>
35
36#include <glib.h>
37
38#ifdef HAVE_NSS
39#include <nspr.h>
40#endif
41
42#include "e-msgport.h"
43
44#define m(x)                    /* msgport debug */
45#define t(x)                    /* thread debug */
46
47void e_dlist_init(EDList *v)
48{
49        v->head = (EDListNode *)&v->tail;
50        v->tail = 0;
51        v->tailpred = (EDListNode *)&v->head;
52}
53
54EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
55{
56        n->next = l->head;
57        n->prev = (EDListNode *)&l->head;
58        l->head->prev = n;
59        l->head = n;
60        return n;
61}
62
63EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
64{
65        n->next = (EDListNode *)&l->tail;
66        n->prev = l->tailpred;
67        l->tailpred->next = n;
68        l->tailpred = n;
69        return n;
70}
71
72EDListNode *e_dlist_remove(EDListNode *n)
73{
74        n->next->prev = n->prev;
75        n->prev->next = n->next;
76        return n;
77}
78
79EDListNode *e_dlist_remhead(EDList *l)
80{
81        EDListNode *n, *nn;
82
83        n = l->head;
84        nn = n->next;
85        if (nn) {
86                nn->prev = n->prev;
87                l->head = nn;
88                return n;
89        }
90        return NULL;
91}
92
93EDListNode *e_dlist_remtail(EDList *l)
94{
95        EDListNode *n, *np;
96
97        n = l->tailpred;
98        np = n->prev;
99        if (np) {
100                np->next = n->next;
101                l->tailpred = np;
102                return n;
103        }
104        return NULL;
105}
106
107int e_dlist_empty(EDList *l)
108{
109        return (l->head == (EDListNode *)&l->tail);
110}
111
112int e_dlist_length(EDList *l)
113{
114        EDListNode *n, *nn;
115        int count = 0;
116
117        n = l->head;
118        nn = n->next;
119        while (nn) {
120                count++;
121                n = nn;
122                nn = n->next;
123        }
124
125        return count;
126}
127
128struct _EMsgPort {
129        EDList queue;
130        int condwait;           /* how many waiting in condwait */
131        union {
132                int pipe[2];
133                struct {
134                        int read;
135                        int write;
136                } fd;
137        } pipe;
138#ifdef HAVE_NSS
139        struct {
140                PRFileDesc *read;
141                PRFileDesc *write;
142        } prpipe;
143#endif
144        /* @#@$#$ glib stuff */
145        GCond *cond;
146        GMutex *lock;
147};
148
149EMsgPort *e_msgport_new(void)
150{
151        EMsgPort *mp;
152
153        mp = g_malloc(sizeof(*mp));
154        e_dlist_init(&mp->queue);
155        mp->lock = g_mutex_new();
156        mp->cond = g_cond_new();
157        mp->pipe.fd.read = -1;
158        mp->pipe.fd.write = -1;
159#ifdef HAVE_NSS
160        mp->prpipe.read = NULL;
161        mp->prpipe.write = NULL;
162#endif
163        mp->condwait = 0;
164
165        return mp;
166}
167
168void e_msgport_destroy(EMsgPort *mp)
169{
170        g_mutex_free(mp->lock);
171        g_cond_free(mp->cond);
172        if (mp->pipe.fd.read != -1) {
173                close(mp->pipe.fd.read);
174                close(mp->pipe.fd.write);
175        }
176#ifdef HAVE_NSS
177        if (mp->prpipe.read) {
178                PR_Close(mp->prpipe.read);
179                PR_Close(mp->prpipe.write);
180        }
181#endif
182        g_free(mp);
183}
184
185/* get a fd that can be used to wait on the port asynchronously */
186int e_msgport_fd(EMsgPort *mp)
187{
188        int fd;
189
190        g_mutex_lock(mp->lock);
191        fd = mp->pipe.fd.read;
192        if (fd == -1) {
193                pipe(mp->pipe.pipe);
194                fd = mp->pipe.fd.read;
195        }
196        g_mutex_unlock(mp->lock);
197
198        return fd;
199}
200
201#ifdef HAVE_NSS
202PRFileDesc *e_msgport_prfd(EMsgPort *mp)
203{
204        PRFileDesc *fd;
205
206        g_mutex_lock(mp->lock);
207        fd = mp->prpipe.read;
208        if (fd == NULL) {
209                PR_CreatePipe(&mp->prpipe.read, &mp->prpipe.write);
210                fd = mp->prpipe.read;
211        }
212        g_mutex_unlock(mp->lock);
213
214        return fd;
215}
216#endif
217
218void e_msgport_put(EMsgPort *mp, EMsg *msg)
219{
220        int fd;
221#ifdef HAVE_NSS
222        PRFileDesc *prfd;
223#endif
224
225        m(printf("put:\n"));
226        g_mutex_lock(mp->lock);
227        e_dlist_addtail(&mp->queue, &msg->ln);
228        if (mp->condwait > 0) {
229                m(printf("put: condwait > 0, waking up\n"));
230                g_cond_signal(mp->cond);
231        }
232        fd = mp->pipe.fd.write;
233#ifdef HAVE_NSS
234        prfd = mp->prpipe.write;
235#endif
236        g_mutex_unlock(mp->lock);
237
238        if (fd != -1) {
239                m(printf("put: have pipe, writing notification to it\n"));
240                write(fd, "", 1);
241        }
242
243#ifdef HAVE_NSS
244        if (prfd != NULL) {
245                m(printf("put: have pr pipe, writing notification to it\n"));
246                PR_Write(prfd, "", 1);
247        }
248#endif
249        m(printf("put: done\n"));
250}
251
252static void
253msgport_cleanlock(void *data)
254{
255        EMsgPort *mp = data;
256
257        g_mutex_unlock(mp->lock);
258}
259
260EMsg *e_msgport_wait(EMsgPort *mp)
261{
262        EMsg *msg;
263
264        m(printf("wait:\n"));
265        g_mutex_lock(mp->lock);
266        while (e_dlist_empty(&mp->queue)) {
267                if (mp->pipe.fd.read != -1) {
268                        fd_set rfds;
269                        int retry;
270
271                        m(printf("wait: waitng on pipe\n"));
272                        g_mutex_unlock(mp->lock);
273                        do {
274                                FD_ZERO(&rfds);
275                                FD_SET(mp->pipe.fd.read, &rfds);
276                                retry = select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL) == -1 && errno == EINTR;
277                                pthread_testcancel();
278                        } while (retry);
279                        g_mutex_lock(mp->lock);
280                        m(printf("wait: got pipe\n"));
281#ifdef HAVE_NSS
282                } else if (mp->prpipe.read != NULL) {
283                        PRPollDesc polltable[1];
284                        int retry;
285
286                        m(printf("wait: waitng on pr pipe\n"));
287                        g_mutex_unlock(mp->lock);
288                        do {
289                                polltable[0].fd = mp->prpipe.read;
290                                polltable[0].in_flags = PR_POLL_READ|PR_POLL_ERR;
291                                retry = PR_Poll(polltable, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR;
292                                pthread_testcancel();
293                        } while (retry);
294                        g_mutex_lock(mp->lock);
295                        m(printf("wait: got pr pipe\n"));
296#endif /* HAVE_NSS */
297                } else {
298                        m(printf("wait: waiting on condition\n"));
299                        mp->condwait++;
300                        /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
301                        pthread_cleanup_push(msgport_cleanlock, mp);
302                        g_cond_wait(mp->cond, mp->lock);
303                        pthread_cleanup_pop(0);
304                        m(printf("wait: got condition\n"));
305                        mp->condwait--;
306                }
307        }
308        msg = (EMsg *)mp->queue.head;
309        m(printf("wait: message = %p\n", msg));
310        g_mutex_unlock(mp->lock);
311        m(printf("wait: done\n"));
312        return msg;
313}
314
315EMsg *e_msgport_get(EMsgPort *mp)
316{
317        EMsg *msg;
318        char dummy[1];
319
320        g_mutex_lock(mp->lock);
321        msg = (EMsg *)e_dlist_remhead(&mp->queue);
322        if (msg) {
323                if (mp->pipe.fd.read != -1)
324                        read(mp->pipe.fd.read, dummy, 1);
325#ifdef HAVE_NSS
326                if (mp->prpipe.read != NULL) {
327                        int c;
328                        c = PR_Read(mp->prpipe.read, dummy, 1);
329                        g_assert(c == 1);
330                }
331#endif
332        }
333        m(printf("get: message = %p\n", msg));
334        g_mutex_unlock(mp->lock);
335
336        return msg;
337}
338
339void e_msgport_reply(EMsg *msg)
340{
341        if (msg->reply_port) {
342                e_msgport_put(msg->reply_port, msg);
343        }
344        /* else lost? */
345}
346
347struct _thread_info {
348        pthread_t id;
349        int busy;
350};
351
352struct _EThread {
353        struct _EThread *next;
354        struct _EThread *prev;
355
356        EMsgPort *server_port;
357        EMsgPort *reply_port;
358        pthread_mutex_t mutex;
359        e_thread_t type;
360        int queue_limit;
361
362        int waiting;            /* if we are waiting for a new message, count of waiting processes */
363        pthread_t id;           /* id of our running child thread */
364        GList *id_list;         /* if THREAD_NEW, then a list of our child threads in thread_info structs */
365
366        EThreadFunc destroy;
367        void *destroy_data;
368
369        EThreadFunc received;
370        void *received_data;
371
372        EThreadFunc lost;
373        void *lost_data;
374};
375
376/* All active threads */
377static EDList ethread_list = E_DLIST_INITIALISER(ethread_list);
378static pthread_mutex_t ethread_lock = PTHREAD_MUTEX_INITIALIZER;
379
380#define E_THREAD_NONE ((pthread_t)~0)
381#define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
382
383static void thread_destroy_msg(EThread *e, EMsg *m);
384
385static struct _thread_info *thread_find(EThread *e, pthread_t id)
386{
387        GList *node;
388        struct _thread_info *info;
389
390        node = e->id_list;
391        while (node) {
392                info = node->data;
393                if (info->id == id)
394                        return info;
395                node = node->next;
396        }
397        return NULL;
398}
399
400#if 0
401static void thread_remove(EThread *e, pthread_t id)
402{
403        GList *node;
404        struct _thread_info *info;
405
406        node = e->id_list;
407        while (node) {
408                info = node->data;
409                if (info->id == id) {
410                        e->id_list = g_list_remove(e->id_list, info);
411                        g_free(info);
412                }
413                node = node->next;
414        }
415}
416#endif
417
418EThread *e_thread_new(e_thread_t type)
419{
420        EThread *e;
421
422        e = g_malloc0(sizeof(*e));
423        pthread_mutex_init(&e->mutex, 0);
424        e->type = type;
425        e->server_port = e_msgport_new();
426        e->id = E_THREAD_NONE;
427        e->queue_limit = INT_MAX;
428
429        pthread_mutex_lock(&ethread_lock);
430        e_dlist_addtail(&ethread_list, (EDListNode *)e);
431        pthread_mutex_unlock(&ethread_lock);
432
433        return e;
434}
435
436/* close down the threads & resources etc */
437void e_thread_destroy(EThread *e)
438{
439        int busy = FALSE;
440        EMsg *msg;
441        struct _thread_info *info;
442        GList *l;
443
444        /* make sure we soak up all the messages first */
445        while ( (msg = e_msgport_get(e->server_port)) ) {
446                thread_destroy_msg(e, msg);
447        }
448
449        pthread_mutex_lock(&e->mutex);
450
451        switch(e->type) {
452        case E_THREAD_QUEUE:
453        case E_THREAD_DROP:
454                /* if we have a thread, 'kill' it */
455                if (e->id != E_THREAD_NONE) {
456                        pthread_t id = e->id;
457
458                        t(printf("Sending thread '%d' quit message\n", id));
459
460                        e->id = E_THREAD_NONE;
461
462                        msg = g_malloc0(sizeof(*msg));
463                        msg->reply_port = E_THREAD_QUIT_REPLYPORT;
464                        e_msgport_put(e->server_port, msg);
465
466                        pthread_mutex_unlock(&e->mutex);
467                        t(printf("Joining thread '%d'\n", id));
468                        pthread_join(id, 0);
469                        t(printf("Joined thread '%d'!\n", id));
470                        pthread_mutex_lock(&e->mutex);
471                }
472                busy = e->id != E_THREAD_NONE;
473                break;
474        case E_THREAD_NEW:
475                /* first, send everyone a quit message */
476                l = e->id_list;
477                while (l) {
478                        info = l->data;
479                        t(printf("Sending thread '%d' quit message\n", info->id));
480                        msg = g_malloc0(sizeof(*msg));
481                        msg->reply_port = E_THREAD_QUIT_REPLYPORT;
482                        e_msgport_put(e->server_port, msg);
483                        l = l->next;                   
484                }
485
486                /* then, wait for everyone to quit */
487                while (e->id_list) {
488                        info = e->id_list->data;
489                        e->id_list = g_list_remove(e->id_list, info);
490                        pthread_mutex_unlock(&e->mutex);
491                        t(printf("Joining thread '%d'\n", info->id));
492                        pthread_join(info->id, 0);
493                        t(printf("Joined thread '%d'!\n", info->id));
494                        pthread_mutex_lock(&e->mutex);
495                        g_free(info);
496                }
497                busy = g_list_length(e->id_list) != 0;
498                break;
499        }
500
501        pthread_mutex_unlock(&e->mutex);
502
503        /* and clean up, if we can */
504        if (busy) {
505                g_warning("threads were busy, leaked EThread");
506                return;
507        }
508
509        pthread_mutex_lock(&ethread_lock);
510        e_dlist_remove((EDListNode *)e);
511        pthread_mutex_unlock(&ethread_lock);
512
513        pthread_mutex_destroy(&e->mutex);
514        e_msgport_destroy(e->server_port);
515        g_free(e);
516}
517
518/* set the queue maximum depth, what happens when the queue
519   fills up depends on the queue type */
520void e_thread_set_queue_limit(EThread *e, int limit)
521{
522        e->queue_limit = limit;
523}
524
525/* set a msg destroy callback, this can not call any e_thread functions on @e */
526void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
527{
528        pthread_mutex_lock(&e->mutex);
529        e->destroy = destroy;
530        e->destroy_data = data;
531        pthread_mutex_unlock(&e->mutex);
532}
533
534/* set a message lost callback, called if any message is discarded */
535void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
536{
537        pthread_mutex_lock(&e->mutex);
538        e->lost = lost;
539        e->lost_data = lost;
540        pthread_mutex_unlock(&e->mutex);
541}
542
543/* set a reply port, if set, then send messages back once finished */
544void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
545{
546        e->reply_port = reply_port;
547}
548
549/* set a received data callback */
550void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
551{
552        pthread_mutex_lock(&e->mutex);
553        e->received = received;
554        e->received_data = data;
555        pthread_mutex_unlock(&e->mutex);
556}
557
558/* find out if we're busy doing any work, e==NULL, check for all work */
559int e_thread_busy(EThread *e)
560{
561        int busy = FALSE;
562
563        if (e == NULL) {
564                pthread_mutex_lock(&ethread_lock);
565                e = (EThread *)ethread_list.head;
566                while (e->next && !busy) {
567                        busy = e_thread_busy(e);
568                        e = e->next;
569                }
570                pthread_mutex_unlock(&ethread_lock);
571        } else {
572                pthread_mutex_lock(&e->mutex);
573                switch (e->type) {
574                case E_THREAD_QUEUE:
575                case E_THREAD_DROP:
576                        busy = e->waiting != 1 && e->id != E_THREAD_NONE;
577                        break;
578                case E_THREAD_NEW:
579                        busy = e->waiting != g_list_length(e->id_list);
580                        break;
581                }
582                pthread_mutex_unlock(&e->mutex);
583        }
584
585        return busy;
586}
587
588static void
589thread_destroy_msg(EThread *e, EMsg *m)
590{
591        EThreadFunc func;
592        void *func_data;
593
594        /* we do this so we never get an incomplete/unmatched callback + data */
595        pthread_mutex_lock(&e->mutex);
596        func = e->destroy;
597        func_data = e->destroy_data;
598        pthread_mutex_unlock(&e->mutex);
599       
600        if (func)
601                func(e, m, func_data);
602}
603
604static void
605thread_received_msg(EThread *e, EMsg *m)
606{
607        EThreadFunc func;
608        void *func_data;
609
610        /* we do this so we never get an incomplete/unmatched callback + data */
611        pthread_mutex_lock(&e->mutex);
612        func = e->received;
613        func_data = e->received_data;
614        pthread_mutex_unlock(&e->mutex);
615       
616        if (func)
617                func(e, m, func_data);
618        else
619                g_warning("No processing callback for EThread, message unprocessed");
620}
621
622static void
623thread_lost_msg(EThread *e, EMsg *m)
624{
625        EThreadFunc func;
626        void *func_data;
627
628        /* we do this so we never get an incomplete/unmatched callback + data */
629        pthread_mutex_lock(&e->mutex);
630        func = e->lost;
631        func_data = e->lost_data;
632        pthread_mutex_unlock(&e->mutex);
633       
634        if (func)
635                func(e, m, func_data);
636}
637
638/* the actual thread dispatcher */
639static void *
640thread_dispatch(void *din)
641{
642        EThread *e = din;
643        EMsg *m;
644        struct _thread_info *info;
645        pthread_t self = pthread_self();
646
647        t(printf("dispatch thread started: %ld\n", pthread_self()));
648
649        while (1) {
650                pthread_mutex_lock(&e->mutex);
651                m = e_msgport_get(e->server_port);
652                if (m == NULL) {
653                        /* nothing to do?  If we are a 'new' type thread, just quit.
654                           Otherwise, go into waiting (can be cancelled here) */
655                        info = NULL;
656                        switch (e->type) {
657                        case E_THREAD_NEW:
658                        case E_THREAD_QUEUE:
659                        case E_THREAD_DROP:
660                                info = thread_find(e, self);
661                                if (info)
662                                        info->busy = FALSE;
663                                e->waiting++;
664                                pthread_mutex_unlock(&e->mutex);
665                                e_msgport_wait(e->server_port);
666                                pthread_mutex_lock(&e->mutex);
667                                e->waiting--;
668                                pthread_mutex_unlock(&e->mutex);
669                                break;
670#if 0
671                        case E_THREAD_NEW:
672                                e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
673                                pthread_mutex_unlock(&e->mutex);
674                                return 0;
675#endif
676                        }
677
678                        continue;
679                } else if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
680                        t(printf("Thread %d got quit message\n", self));
681                        /* Handle a quit message, say we're quitting, free the message, and break out of the loop */
682                        info = thread_find(e, self);
683                        if (info)
684                                info->busy = 2;
685                        pthread_mutex_unlock(&e->mutex);
686                        g_free(m);
687                        break;
688                } else {
689                        info = thread_find(e, self);
690                        if (info)
691                                info->busy = TRUE;
692                }
693                pthread_mutex_unlock(&e->mutex);
694
695                t(printf("got message in dispatch thread\n"));
696
697                /* process it */
698                thread_received_msg(e, m);
699
700                /* if we have a reply port, send it back, otherwise, lose it */
701                if (m->reply_port) {
702                        e_msgport_reply(m);
703                } else {
704                        thread_destroy_msg(e, m);
705                }
706        }
707
708        return NULL;
709}
710
711/* send a message to the thread, start thread if necessary */
712void e_thread_put(EThread *e, EMsg *msg)
713{
714        pthread_t id;
715        EMsg *dmsg = NULL;
716
717        pthread_mutex_lock(&e->mutex);
718
719        /* the caller forgot to tell us what to do, well, we can't do anything can we */
720        if (e->received == NULL) {
721                pthread_mutex_unlock(&e->mutex);
722                g_warning("EThread called with no receiver function, no work to do!");
723                thread_destroy_msg(e, msg);
724                return;
725        }
726
727        msg->reply_port = e->reply_port;
728
729        switch(e->type) {
730        case E_THREAD_QUEUE:
731                /* if the queue is full, lose this new addition */
732                if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
733                        e_msgport_put(e->server_port, msg);
734                } else {
735                        printf("queue limit reached, dropping new message\n");
736                        dmsg = msg;
737                }
738                break;
739        case E_THREAD_DROP:
740                /* if the queue is full, lose the oldest (unprocessed) message */
741                if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
742                        e_msgport_put(e->server_port, msg);
743                } else {
744                        printf("queue limit reached, dropping old message\n");
745                        e_msgport_put(e->server_port, msg);
746                        dmsg = e_msgport_get(e->server_port);
747                }
748                break;
749        case E_THREAD_NEW:
750                /* it is possible that an existing thread can catch this message, so
751                   we might create a thread with no work to do.
752                   but that doesn't matter, the other alternative that it be lost is worse */
753                e_msgport_put(e->server_port, msg);
754                if (e->waiting == 0
755                    && g_list_length(e->id_list) < e->queue_limit
756                    && pthread_create(&id, NULL, thread_dispatch, e) == 0) {
757                        struct _thread_info *info = g_malloc0(sizeof(*info));
758                        t(printf("created NEW thread %ld\n", id));
759                        info->id = id;
760                        info->busy = TRUE;
761                        e->id_list = g_list_append(e->id_list, info);
762                }
763                pthread_mutex_unlock(&e->mutex);
764                return;
765        }
766
767        /* create the thread, if there is none to receive it yet */
768        if (e->id == E_THREAD_NONE) {
769                if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
770                        g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
771                        e->id = E_THREAD_NONE;
772                }
773        }
774
775        pthread_mutex_unlock(&e->mutex);
776
777        if (dmsg) {
778                thread_lost_msg(e, dmsg);
779                thread_destroy_msg(e, dmsg);
780        }
781}
782
783/* yet-another-mutex interface */
784struct _EMutex {
785        int type;
786        pthread_t owner;
787        short waiters;
788        short depth;
789        pthread_mutex_t mutex;
790        pthread_cond_t cond;
791};
792
793/* sigh, this is just painful to have to need, but recursive
794   read/write, etc mutexes just aren't very common in thread
795   implementations */
796/* TODO: Just make it use recursive mutexes if they are available */
797EMutex *e_mutex_new(e_mutex_t type)
798{
799        struct _EMutex *m;
800
801        m = g_malloc(sizeof(*m));
802        m->type = type;
803        m->waiters = 0;
804        m->depth = 0;
805        m->owner = E_THREAD_NONE;
806
807        switch (type) {
808        case E_MUTEX_SIMPLE:
809                pthread_mutex_init(&m->mutex, 0);
810                break;
811        case E_MUTEX_REC:
812                pthread_mutex_init(&m->mutex, 0);
813                pthread_cond_init(&m->cond, 0);
814                break;
815                /* read / write ?  flags for same? */
816        }
817
818        return m;
819}
820
821int e_mutex_destroy(EMutex *m)
822{
823        int ret = 0;
824
825        switch (m->type) {
826        case E_MUTEX_SIMPLE:
827                ret = pthread_mutex_destroy(&m->mutex);
828                if (ret == -1)
829                        g_warning("EMutex destroy failed: %s", strerror(errno));
830                g_free(m);
831                break;
832        case E_MUTEX_REC:
833                ret = pthread_mutex_destroy(&m->mutex);
834                if (ret == -1)
835                        g_warning("EMutex destroy failed: %s", strerror(errno));
836                ret = pthread_cond_destroy(&m->cond);
837                if (ret == -1)
838                        g_warning("EMutex destroy failed: %s", strerror(errno));
839                g_free(m);
840
841        }
842        return ret;
843}
844
845int e_mutex_lock(EMutex *m)
846{
847        pthread_t id;
848
849        switch (m->type) {
850        case E_MUTEX_SIMPLE:
851                return pthread_mutex_lock(&m->mutex);
852        case E_MUTEX_REC:
853                id = pthread_self();
854                if (pthread_mutex_lock(&m->mutex) == -1)
855                        return -1;
856                while (1) {
857                        if (m->owner == E_THREAD_NONE) {
858                                m->owner = id;
859                                m->depth = 1;
860                                break;
861                        } else if (id == m->owner) {
862                                m->depth++;
863                                break;
864                        } else {
865                                m->waiters++;
866                                if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
867                                        return -1;
868                                m->waiters--;
869                        }
870                }
871                return pthread_mutex_unlock(&m->mutex);
872        }
873
874        errno = EINVAL;
875        return -1;
876}
877
878int e_mutex_unlock(EMutex *m)
879{
880        switch (m->type) {
881        case E_MUTEX_SIMPLE:
882                return pthread_mutex_unlock(&m->mutex);
883        case E_MUTEX_REC:
884                if (pthread_mutex_lock(&m->mutex) == -1)
885                        return -1;
886                g_assert(m->owner == pthread_self());
887
888                m->depth--;
889                if (m->depth == 0) {
890                        m->owner = E_THREAD_NONE;
891                        if (m->waiters > 0)
892                                pthread_cond_signal(&m->cond);
893                }
894                return pthread_mutex_unlock(&m->mutex);
895        }
896
897        errno = EINVAL;
898        return -1;
899}
900
901void e_mutex_assert_locked(EMutex *m)
902{
903        g_return_if_fail (m->type == E_MUTEX_REC);
904        pthread_mutex_lock(&m->mutex);
905        g_assert(m->owner == pthread_self());
906        pthread_mutex_unlock(&m->mutex);
907}
908
909#ifdef STANDALONE
910EMsgPort *server_port;
911
912
913void *fdserver(void *data)
914{
915        int fd;
916        EMsg *msg;
917        int id = (int)data;
918        fd_set rfds;
919
920        fd = e_msgport_fd(server_port);
921
922        while (1) {
923                int count = 0;
924
925                printf("server %d: waiting on fd %d\n", id, fd);
926                FD_ZERO(&rfds);
927                FD_SET(fd, &rfds);
928                select(fd+1, &rfds, NULL, NULL, NULL);
929                printf("server %d: Got async notification, checking for messages\n", id);
930                while ((msg = e_msgport_get(server_port))) {
931                        printf("server %d: got message\n", id);
932                        sleep(1);
933                        printf("server %d: replying\n", id);
934                        e_msgport_reply(msg);
935                        count++;
936                }
937                printf("server %d: got %d messages\n", id, count);
938        }
939}
940
941void *server(void *data)
942{
943        EMsg *msg;
944        int id = (int)data;
945
946        while (1) {
947                printf("server %d: waiting\n", id);
948                msg = e_msgport_wait(server_port);
949                msg = e_msgport_get(server_port);
950                if (msg) {
951                        printf("server %d: got message\n", id);
952                        sleep(1);
953                        printf("server %d: replying\n", id);
954                        e_msgport_reply(msg);
955                } else {
956                        printf("server %d: didn't get message\n", id);
957                }
958        }
959}
960
961void *client(void *data)
962{
963        EMsg *msg;
964        EMsgPort *replyport;
965        int i;
966
967        replyport = e_msgport_new();
968        msg = g_malloc0(sizeof(*msg));
969        msg->reply_port = replyport;
970        for (i=0;i<10;i++) {
971                /* synchronous operation */
972                printf("client: sending\n");
973                e_msgport_put(server_port, msg);
974                printf("client: waiting for reply\n");
975                e_msgport_wait(replyport);
976                e_msgport_get(replyport);
977                printf("client: got reply\n");
978        }
979
980        printf("client: sleeping ...\n");
981        sleep(2);
982        printf("client: sending multiple\n");
983
984        for (i=0;i<10;i++) {
985                msg = g_malloc0(sizeof(*msg));
986                msg->reply_port = replyport;
987                e_msgport_put(server_port, msg);
988        }
989
990        printf("client: receiving multiple\n");
991        for (i=0;i<10;i++) {
992                e_msgport_wait(replyport);
993                msg = e_msgport_get(replyport);
994                g_free(msg);
995        }
996
997        printf("client: done\n");
998}
999
1000int main(int argc, char **argv)
1001{
1002        pthread_t serverid, clientid;
1003
1004        g_thread_init(NULL);
1005
1006        server_port = e_msgport_new();
1007
1008        /*pthread_create(&serverid, NULL, server, (void *)1);*/
1009        pthread_create(&serverid, NULL, fdserver, (void *)1);
1010        pthread_create(&clientid, NULL, client, NULL);
1011
1012        sleep(60);
1013
1014        return 0;
1015}
1016#endif
Note: See TracBrowser for help on using the repository browser.