Line data Source code
1 : /* t-poll.c - Check the poll function
2 : * Copyright (C) 2015 g10 Code GmbH
3 : *
4 : * This file is part of libgpg-error.
5 : *
6 : * libgpg-error is free software; you can redistribute it and/or
7 : * modify it under the terms of the GNU Lesser General Public License
8 : * as published by the Free Software Foundation; either version 2.1 of
9 : * the License, or (at your option) any later version.
10 : *
11 : * libgpg-error is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * Lesser General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public
17 : * License along with this program; if not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : /* FIXME: We need much better tests that this very basic one. */
21 :
22 : #if HAVE_CONFIG_H
23 : # include <config.h>
24 : #endif
25 :
26 : #include <stdio.h>
27 : #include <stdlib.h>
28 : #include <string.h>
29 : #include <assert.h>
30 : #include <sys/types.h>
31 : #include <unistd.h>
32 : #include <errno.h>
33 : #ifdef _WIN32
34 : # include <windows.h>
35 : # include <time.h>
36 : #else
37 : # include <pthread.h>
38 : #endif
39 :
40 : #define PGM "t-lock"
41 :
42 : #include "t-common.h"
43 :
44 : #ifdef _WIN32
45 : # define THREAD_RET_TYPE DWORD WINAPI
46 : # define THREAD_RET_VALUE 0
47 : #else
48 : # define THREAD_RET_TYPE void *
49 : # define THREAD_RET_VALUE NULL
50 : #endif
51 :
52 :
53 : /* Object to convey data to a thread. */
54 : struct thread_arg
55 : {
56 : const char *name;
57 : estream_t stream;
58 : volatile int stop_me;
59 : #ifdef USE_POSIX_THREADS
60 : pthread_t thread;
61 : #elif _WIN32
62 : HANDLE thread;
63 : #endif
64 : };
65 :
66 :
67 : static struct thread_arg peer_stdin; /* Thread to feed the stdin. */
68 : static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
69 : static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
70 :
71 : static estream_t test_stdin;
72 : static estream_t test_stdout;
73 : static estream_t test_stderr;
74 :
75 : #if defined(_WIN32) || defined(USE_POSIX_THREADS)
76 :
77 : /* This thread feeds data to the given stream. */
78 : static THREAD_RET_TYPE
79 1 : producer_thread (void *argaddr)
80 : {
81 : struct thread_arg *arg = argaddr;
82 : int i = 0;
83 :
84 : (void)arg;
85 :
86 5 : while (!arg->stop_me && i++ < 3)
87 : {
88 3 : show ("thread '%s' about to write\n", arg->name);
89 3 : es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
90 3 : es_fflush (arg->stream);
91 : }
92 1 : es_fclose (arg->stream);
93 1 : return THREAD_RET_VALUE;
94 : }
95 :
96 : /* This thread eats data from the given stream. */
97 : static THREAD_RET_TYPE
98 2 : consumer_thread (void *argaddr)
99 : {
100 : struct thread_arg *arg = argaddr;
101 : char buf[15];
102 :
103 : (void)arg;
104 :
105 13 : while (!arg->stop_me)
106 : {
107 11 : show ("thread '%s' ready to read\n", arg->name);
108 11 : if (!es_fgets (buf, sizeof buf, arg->stream))
109 : {
110 2 : show ("Thread '%s' received EOF or error\n", arg->name);
111 2 : break;
112 : }
113 9 : show ("Thread '%s' got: '%s'\n", arg->name, buf);
114 : }
115 2 : es_fclose (arg->stream);
116 2 : return THREAD_RET_VALUE;
117 : }
118 :
119 : #endif /*_WIN32 || USE_POSIX_THREADS */
120 :
121 :
122 : static void
123 3 : launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
124 : {
125 : #ifdef _WIN32
126 :
127 : th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
128 : if (!th->thread)
129 : die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
130 : show ("thread '%s' launched (fd=%d)\n", th->name, es_fileno (th->stream));
131 :
132 : #elif USE_POSIX_THREADS
133 :
134 3 : th->stop_me = 0;
135 3 : if (pthread_create (&th->thread, NULL, fnc, th))
136 0 : die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
137 3 : show ("thread '%s' launched (fd=%d)\n", th->name, es_fileno (th->stream));
138 :
139 : # else /* no thread support */
140 :
141 : verbose++;
142 : show ("no thread support - skipping test\n", PGM);
143 : verbose--;
144 :
145 : #endif /* no thread support */
146 3 : }
147 :
148 :
149 : static void
150 3 : join_thread (struct thread_arg *th)
151 : {
152 : #ifdef _WIN32
153 : int rc;
154 :
155 : rc = WaitForSingleObject (th->thread, INFINITE);
156 : if (rc == WAIT_OBJECT_0)
157 : show ("thread '%s' has terminated\n", th->name);
158 : else
159 : fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
160 : CloseHandle (th->thread);
161 :
162 : #elif USE_POSIX_THREADS
163 :
164 3 : pthread_join (th->thread, NULL);
165 3 : show ("thread '%s' has terminated\n", th->name);
166 :
167 : #endif
168 3 : }
169 :
170 :
171 : static void
172 3 : create_pipe (estream_t *r_in, estream_t *r_out)
173 : {
174 : gpg_error_t err;
175 : int filedes[2];
176 :
177 : #ifdef _WIN32
178 : if (_pipe (filedes, 512, 0) == -1)
179 : #else
180 3 : if (pipe (filedes) == -1)
181 : #endif
182 : {
183 : err = gpg_error_from_syserror ();
184 0 : die ("error creating a pipe: %s\n", gpg_strerror (err));
185 : }
186 :
187 3 : show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
188 :
189 3 : *r_in = es_fdopen (filedes[0], "r");
190 3 : if (!*r_in)
191 : {
192 : err = gpg_error_from_syserror ();
193 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
194 : }
195 :
196 3 : *r_out = es_fdopen (filedes[1], "w");
197 3 : if (!*r_out)
198 : {
199 : err = gpg_error_from_syserror ();
200 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
201 : }
202 3 : }
203 :
204 :
205 : static void
206 1 : test_poll (void)
207 : {
208 : int ret;
209 : gpgrt_poll_t fds[3];
210 : char buffer[16];
211 : size_t used, nwritten;
212 : int c;
213 :
214 1 : memset (fds, 0, sizeof fds);
215 1 : fds[0].stream = test_stdin;
216 1 : fds[0].want_read = 1;
217 1 : fds[1].stream = test_stdout;
218 1 : fds[1].want_write = 1;
219 : /* FIXME: We don't use the next stream at all. */
220 1 : fds[2].stream = test_stderr;
221 1 : fds[2].want_write = 1;
222 1 : fds[2].ignore = 1;
223 :
224 :
225 : used = 0;
226 15 : while (used || !fds[0].ignore)
227 : {
228 13 : ret = gpgrt_poll (fds, DIM(fds), -1);
229 13 : if (ret == -1)
230 : {
231 0 : fail ("gpgrt_poll failed: %s\n", strerror (errno));
232 0 : continue;
233 : }
234 13 : if (!ret)
235 : {
236 0 : fail ("gpgrt_poll unexpectedly timed out\n");
237 0 : continue;
238 : }
239 13 : show ("gpgrt_poll detected %d events\n", ret);
240 13 : if (fds[0].got_read)
241 : {
242 : /* Read from the producer. */
243 : for (;;)
244 : {
245 100 : c = es_fgetc (fds[0].stream);
246 100 : if (c == EOF)
247 : {
248 1 : if (es_feof (fds[0].stream))
249 : {
250 1 : show ("reading '%s': EOF\n", peer_stdin.name);
251 1 : fds[0].ignore = 1; /* Not anymore needed. */
252 1 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
253 : }
254 0 : else if (es_ferror (fds[0].stream))
255 : {
256 0 : fail ("error reading '%s': %s\n",
257 0 : peer_stdin.name, strerror (errno));
258 0 : fds[0].ignore = 1; /* Disable. */
259 0 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
260 : }
261 : else
262 0 : show ("reading '%s': EAGAIN\n", peer_stdin.name);
263 : break;
264 : }
265 : else
266 : {
267 99 : if (used <= sizeof buffer -1)
268 99 : buffer[used++] = c;
269 99 : if (used == sizeof buffer)
270 : {
271 6 : show ("throttling reading from '%s'\n", peer_stdin.name);
272 6 : fds[0].ignore = 1;
273 6 : break;
274 : }
275 : }
276 : }
277 7 : show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
278 7 : if (used)
279 7 : fds[1].ignore = 0; /* Data to send. */
280 : }
281 13 : if (fds[1].got_write)
282 : {
283 7 : if (used)
284 : {
285 7 : ret = es_write (fds[1].stream, buffer, used, &nwritten);
286 7 : show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
287 : peer_stdout.name, ret, used, nwritten);
288 7 : if (!ret)
289 : {
290 7 : assert (nwritten <= used);
291 7 : memmove (buffer, buffer + nwritten, nwritten);
292 7 : used -= nwritten;
293 : }
294 7 : ret = es_fflush (fds[1].stream);
295 7 : if (ret)
296 0 : fail ("Flushing for '%s' failed: %s\n",
297 0 : peer_stdout.name, strerror (errno));
298 : }
299 7 : if (!used)
300 7 : fds[1].ignore = 1; /* No need to send data. */
301 : }
302 :
303 13 : if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
304 : {
305 6 : show ("accelerate reading from '%s'\n", peer_stdin.name);
306 6 : fds[0].ignore = 0;
307 : }
308 : }
309 1 : }
310 :
311 :
312 : int
313 1 : main (int argc, char **argv)
314 : {
315 : int last_argc = -1;
316 :
317 1 : if (argc)
318 : {
319 1 : argc--; argv++;
320 : }
321 1 : while (argc && last_argc != argc )
322 : {
323 : last_argc = argc;
324 0 : if (!strcmp (*argv, "--help"))
325 : {
326 0 : puts (
327 : "usage: ./t-poll [options]\n"
328 : "\n"
329 : "Options:\n"
330 : " --verbose Show what is going on\n"
331 : " --debug Flyswatter\n"
332 : );
333 0 : exit (0);
334 : }
335 0 : if (!strcmp (*argv, "--verbose"))
336 : {
337 0 : verbose = 1;
338 0 : argc--; argv++;
339 : }
340 0 : else if (!strcmp (*argv, "--debug"))
341 : {
342 0 : verbose = debug = 1;
343 0 : argc--; argv++;
344 : }
345 : }
346 :
347 1 : if (!gpg_error_check_version (GPG_ERROR_VERSION))
348 : {
349 0 : die ("gpg_error_check_version returned an error");
350 : errorcount++;
351 : }
352 :
353 1 : peer_stdin.name = "stdin producer";
354 1 : create_pipe (&test_stdin, &peer_stdin.stream);
355 1 : peer_stdout.name = "stdout consumer";
356 1 : create_pipe (&peer_stdout.stream, &test_stdout);
357 1 : peer_stderr.name = "stderr consumer";
358 1 : create_pipe (&peer_stderr.stream, &test_stderr);
359 :
360 1 : if (es_set_nonblock (test_stdin, 1))
361 0 : fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
362 1 : if (es_set_nonblock (test_stdout, 1))
363 0 : fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
364 1 : if (es_set_nonblock (test_stderr, 1))
365 0 : fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
366 :
367 1 : launch_thread (producer_thread, &peer_stdin );
368 1 : launch_thread (consumer_thread, &peer_stdout);
369 1 : launch_thread (consumer_thread, &peer_stderr);
370 1 : test_poll ();
371 1 : show ("Waiting for threads to terminate...\n");
372 1 : es_fclose (test_stdin);
373 1 : es_fclose (test_stdout);
374 1 : es_fclose (test_stderr);
375 1 : peer_stdin.stop_me = 1;
376 1 : peer_stdout.stop_me = 1;
377 1 : peer_stderr.stop_me = 1;
378 1 : join_thread (&peer_stdin);
379 1 : join_thread (&peer_stdout);
380 1 : join_thread (&peer_stderr);
381 :
382 1 : return errorcount ? 1 : 0;
383 : }
|