Line data Source code
1 : /* t-poll.c - Check the poll function
2 : * Copyright (C) 2015 g10 Code GmbH
3 : *
4 : * This file is part of libgpg-error.
5 : *
6 : * libgpg-error is free software; you can redistribute it and/or
7 : * modify it under the terms of the GNU Lesser General Public License
8 : * as published by the Free Software Foundation; either version 2.1 of
9 : * the License, or (at your option) any later version.
10 : *
11 : * libgpg-error is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * Lesser General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public
17 : * License along with this program; if not, see <https://www.gnu.org/licenses/>.
18 : */
19 :
20 : /* FIXME: We need much better tests that this very basic one. */
21 :
22 : #if HAVE_CONFIG_H
23 : # include <config.h>
24 : #endif
25 :
26 : #include <stdio.h>
27 : #include <stdlib.h>
28 : #include <string.h>
29 : #include <assert.h>
30 : #include <sys/types.h>
31 : #include <unistd.h>
32 : #include <errno.h>
33 : #ifdef _WIN32
34 : # include <windows.h>
35 : # include <time.h>
36 : #else
37 : # ifdef USE_POSIX_THREADS
38 : # include <pthread.h>
39 : # endif
40 : #endif
41 :
42 : #define PGM "t-poll"
43 :
44 : #include "t-common.h"
45 :
46 : #ifdef _WIN32
47 : # define THREAD_RET_TYPE DWORD WINAPI
48 : # define THREAD_RET_VALUE 0
49 : #else
50 : # define THREAD_RET_TYPE void *
51 : # define THREAD_RET_VALUE NULL
52 : #endif
53 :
54 :
55 : /* Object to convey data to a thread. */
56 : struct thread_arg
57 : {
58 : const char *name;
59 : estream_t stream;
60 : volatile int stop_me;
61 : #ifdef USE_POSIX_THREADS
62 : pthread_t thread;
63 : #elif _WIN32
64 : HANDLE thread;
65 : #endif
66 : };
67 :
68 :
69 : static struct thread_arg peer_stdin; /* Thread to feed the stdin. */
70 : static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
71 : static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
72 :
73 : static estream_t test_stdin;
74 : static estream_t test_stdout;
75 : static estream_t test_stderr;
76 :
77 : #if defined(_WIN32) || defined(USE_POSIX_THREADS)
78 :
79 : /* This thread feeds data to the given stream. */
80 : static THREAD_RET_TYPE
81 1 : producer_thread (void *argaddr)
82 : {
83 : struct thread_arg *arg = argaddr;
84 : int i = 0;
85 :
86 : (void)arg;
87 :
88 5 : while (!arg->stop_me && i++ < 3)
89 : {
90 3 : show ("thread '%s' about to write\n", arg->name);
91 3 : es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
92 3 : es_fflush (arg->stream);
93 : }
94 1 : es_fclose (arg->stream);
95 1 : return THREAD_RET_VALUE;
96 : }
97 :
98 : /* This thread eats data from the given stream. */
99 : static THREAD_RET_TYPE
100 2 : consumer_thread (void *argaddr)
101 : {
102 : struct thread_arg *arg = argaddr;
103 : char buf[15];
104 :
105 : (void)arg;
106 :
107 5 : while (!arg->stop_me)
108 : {
109 2 : show ("thread '%s' ready to read\n", arg->name);
110 2 : if (!es_fgets (buf, sizeof buf, arg->stream))
111 : {
112 1 : show ("Thread '%s' received EOF or error\n", arg->name);
113 1 : break;
114 : }
115 1 : show ("Thread '%s' got: '%s'\n", arg->name, buf);
116 : }
117 2 : es_fclose (arg->stream);
118 2 : return THREAD_RET_VALUE;
119 : }
120 :
121 : #endif /*_WIN32 || USE_POSIX_THREADS */
122 :
123 :
124 : static void
125 3 : launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
126 : {
127 : int fd;
128 :
129 3 : th->stop_me = 0;
130 3 : fd = es_fileno (th->stream);
131 : #ifdef _WIN32
132 :
133 : th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
134 : if (!th->thread)
135 : die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
136 : show ("thread '%s' launched (fd=%d)\n", th->name, fd);
137 :
138 : #elif USE_POSIX_THREADS
139 :
140 3 : if (pthread_create (&th->thread, NULL, fnc, th))
141 0 : die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
142 3 : show ("thread '%s' launched (fd=%d)\n", th->name, fd);
143 :
144 : # else /* no thread support */
145 :
146 : verbose++;
147 : show ("no thread support - skipping test\n", PGM);
148 : verbose--;
149 :
150 : #endif /* no thread support */
151 3 : }
152 :
153 :
154 : static void
155 3 : join_thread (struct thread_arg *th)
156 : {
157 : #ifdef _WIN32
158 : int rc;
159 :
160 : rc = WaitForSingleObject (th->thread, INFINITE);
161 : if (rc == WAIT_OBJECT_0)
162 : show ("thread '%s' has terminated\n", th->name);
163 : else
164 : fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
165 : CloseHandle (th->thread);
166 :
167 : #elif USE_POSIX_THREADS
168 :
169 3 : pthread_join (th->thread, NULL);
170 3 : show ("thread '%s' has terminated\n", th->name);
171 :
172 : #endif
173 3 : }
174 :
175 :
176 : static void
177 3 : create_pipe (estream_t *r_in, estream_t *r_out)
178 : {
179 : gpg_error_t err;
180 : int filedes[2];
181 :
182 : #ifdef _WIN32
183 : if (_pipe (filedes, 512, 0) == -1)
184 : #else
185 3 : if (pipe (filedes) == -1)
186 : #endif
187 : {
188 : err = gpg_error_from_syserror ();
189 0 : die ("error creating a pipe: %s\n", gpg_strerror (err));
190 : }
191 :
192 3 : show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
193 :
194 3 : *r_in = es_fdopen (filedes[0], "r,pollable");
195 3 : if (!*r_in)
196 : {
197 : err = gpg_error_from_syserror ();
198 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
199 : }
200 :
201 3 : *r_out = es_fdopen (filedes[1], "w,pollable");
202 3 : if (!*r_out)
203 : {
204 : err = gpg_error_from_syserror ();
205 0 : die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
206 : }
207 3 : }
208 :
209 :
210 : static void
211 1 : test_poll (void)
212 : {
213 : int ret;
214 : gpgrt_poll_t fds[3];
215 : char buffer[16];
216 : size_t used, nwritten;
217 : int c;
218 :
219 1 : memset (fds, 0, sizeof fds);
220 1 : fds[0].stream = test_stdin;
221 1 : fds[0].want_read = 1;
222 1 : fds[1].stream = test_stdout;
223 1 : fds[1].want_write = 1;
224 : /* FIXME: We don't use the next stream at all. */
225 1 : fds[2].stream = test_stderr;
226 1 : fds[2].want_write = 1;
227 1 : fds[2].ignore = 1;
228 :
229 :
230 : used = 0;
231 15 : while (used || !fds[0].ignore)
232 : {
233 13 : ret = gpgrt_poll (fds, DIM(fds), -1);
234 13 : if (ret == -1)
235 : {
236 0 : fail ("gpgrt_poll failed: %s\n", strerror (errno));
237 0 : continue;
238 : }
239 13 : if (!ret)
240 : {
241 0 : fail ("gpgrt_poll unexpectedly timed out\n");
242 0 : continue;
243 : }
244 13 : show ("gpgrt_poll detected %d events\n", ret);
245 13 : if (fds[0].got_read)
246 : {
247 : /* Read from the producer. */
248 : for (;;)
249 : {
250 100 : c = es_fgetc (fds[0].stream);
251 100 : if (c == EOF)
252 : {
253 1 : if (es_feof (fds[0].stream))
254 : {
255 1 : show ("reading '%s': EOF\n", peer_stdin.name);
256 1 : fds[0].ignore = 1; /* Not anymore needed. */
257 1 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
258 : }
259 0 : else if (es_ferror (fds[0].stream))
260 : {
261 0 : fail ("error reading '%s': %s\n",
262 0 : peer_stdin.name, strerror (errno));
263 0 : fds[0].ignore = 1; /* Disable. */
264 0 : peer_stdin.stop_me = 1; /* Tell the thread to stop. */
265 : }
266 : else
267 0 : show ("reading '%s': EAGAIN\n", peer_stdin.name);
268 : break;
269 : }
270 : else
271 : {
272 99 : if (used <= sizeof buffer -1)
273 99 : buffer[used++] = c;
274 99 : if (used == sizeof buffer)
275 : {
276 6 : show ("throttling reading from '%s'\n", peer_stdin.name);
277 6 : fds[0].ignore = 1;
278 6 : break;
279 : }
280 : }
281 : }
282 7 : show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
283 7 : if (used)
284 7 : fds[1].ignore = 0; /* Data to send. */
285 : }
286 13 : if (fds[1].got_write)
287 : {
288 7 : if (used)
289 : {
290 7 : ret = es_write (fds[1].stream, buffer, used, &nwritten);
291 7 : show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
292 : peer_stdout.name, ret, used, nwritten);
293 7 : if (!ret)
294 : {
295 7 : assert (nwritten <= used);
296 : /* Move the remaining data to the front of buffer. */
297 7 : memmove (buffer, buffer + nwritten,
298 : sizeof buffer - nwritten);
299 7 : used -= nwritten;
300 : }
301 7 : ret = es_fflush (fds[1].stream);
302 7 : if (ret)
303 0 : fail ("Flushing for '%s' failed: %s\n",
304 0 : peer_stdout.name, strerror (errno));
305 : }
306 7 : if (!used)
307 7 : fds[1].ignore = 1; /* No need to send data. */
308 : }
309 :
310 13 : if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
311 : {
312 6 : show ("accelerate reading from '%s'\n", peer_stdin.name);
313 6 : fds[0].ignore = 0;
314 : }
315 : }
316 1 : }
317 :
318 :
319 : int
320 1 : main (int argc, char **argv)
321 : {
322 : int last_argc = -1;
323 :
324 1 : if (argc)
325 : {
326 1 : argc--; argv++;
327 : }
328 1 : while (argc && last_argc != argc )
329 : {
330 : last_argc = argc;
331 0 : if (!strcmp (*argv, "--help"))
332 : {
333 0 : puts (
334 : "usage: ./t-poll [options]\n"
335 : "\n"
336 : "Options:\n"
337 : " --verbose Show what is going on\n"
338 : " --debug Flyswatter\n"
339 : );
340 0 : exit (0);
341 : }
342 0 : if (!strcmp (*argv, "--verbose"))
343 : {
344 0 : verbose = 1;
345 0 : argc--; argv++;
346 : }
347 0 : else if (!strcmp (*argv, "--debug"))
348 : {
349 0 : verbose = debug = 1;
350 0 : argc--; argv++;
351 : }
352 : }
353 :
354 1 : if (!gpg_error_check_version (GPG_ERROR_VERSION))
355 : {
356 0 : die ("gpg_error_check_version returned an error");
357 : errorcount++;
358 : }
359 :
360 1 : peer_stdin.name = "stdin producer";
361 1 : create_pipe (&test_stdin, &peer_stdin.stream);
362 1 : peer_stdout.name = "stdout consumer";
363 1 : create_pipe (&peer_stdout.stream, &test_stdout);
364 1 : peer_stderr.name = "stderr consumer";
365 1 : create_pipe (&peer_stderr.stream, &test_stderr);
366 :
367 1 : if (es_set_nonblock (test_stdin, 1))
368 0 : fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
369 1 : if (es_set_nonblock (test_stdout, 1))
370 0 : fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
371 1 : if (es_set_nonblock (test_stderr, 1))
372 0 : fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
373 :
374 1 : launch_thread (producer_thread, &peer_stdin );
375 1 : launch_thread (consumer_thread, &peer_stdout);
376 1 : launch_thread (consumer_thread, &peer_stderr);
377 1 : test_poll ();
378 1 : show ("Waiting for threads to terminate...\n");
379 1 : es_fclose (test_stdin);
380 1 : es_fclose (test_stdout);
381 1 : es_fclose (test_stderr);
382 1 : peer_stdin.stop_me = 1;
383 1 : peer_stdout.stop_me = 1;
384 1 : peer_stderr.stop_me = 1;
385 1 : join_thread (&peer_stdin);
386 1 : join_thread (&peer_stdout);
387 1 : join_thread (&peer_stderr);
388 :
389 1 : return errorcount ? 1 : 0;
390 : }
|