1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
//! Multi - initiating multiple requests simultaneously

use std::time::Duration;

use libc::{c_int, c_char, c_void, c_long};
use curl_sys;

use MultiError;
use easy::Easy;
use panic;

/// A multi handle for initiating multiple connections simultaneously.
///
/// This structure corresponds to `CURLM` in libcurl and provides the ability to
/// have multiple transfers in flight simultaneously. This handle is then used
/// to manage each transfer. The main purpose of a `CURLM` is for the
/// *application* to drive the I/O rather than libcurl itself doing all the
/// blocking. Methods like `action` allow the application to inform libcurl of
/// when events have happened.
///
/// Lots more documentation can be found on the libcurl [multi tutorial] where
/// the APIs correspond pretty closely with this crate.
///
/// [multi tutorial]: https://curl.haxx.se/libcurl/c/libcurl-multi.html
pub struct Multi {
    raw: *mut curl_sys::CURLM,
    data: Box<MultiData>,
}

struct MultiData {
    socket: Box<FnMut(Socket, SocketEvents, usize) + Send>,
    timer: Box<FnMut(Option<Duration>) -> bool + Send>,
}

/// Message from the `messages` function of a multi handle.
///
/// Currently only indicates whether a transfer is done.
pub struct Message<'multi> {
    ptr: *mut curl_sys::CURLMsg,
    _multi: &'multi Multi,
}

/// Wrapper around an easy handle while it's owned by a multi handle.
///
/// Once an easy handle has been added to a multi handle then it can no longer
/// be used via `perform`. This handle is also used to remove the easy handle
/// from the multi handle when desired.
pub struct EasyHandle<'multi> {
    multi: &'multi Multi,
    easy: Option<Easy>,
}

/// Notification of the events that have happened on a socket.
///
/// This type is passed as an argument to the `action` method on a multi handle
/// to indicate what events have occurred on a socket.
pub struct Events {
    bits: c_int,
}

/// Notification of events that are requested on a socket.
///
/// This type is yielded to the `socket_function` callback to indicate what
/// events are requested on a socket.
pub struct SocketEvents {
    bits: c_int,
}

/// Raw underlying socket type that the multi handles use
pub type Socket = curl_sys::curl_socket_t;

impl Multi {
    /// Creates a new multi session through which multiple HTTP transfers can be
    /// initiated.
    pub fn new() -> Multi {
        unsafe {
            ::init();
            let ptr = curl_sys::curl_multi_init();
            assert!(!ptr.is_null());
            Multi {
                raw: ptr,
                data: Box::new(MultiData {
                    socket: Box::new(|_, _, _| ()),
                    timer: Box::new(|_| true),
                }),
            }
        }
    }

    /// Set the callback informed about what to wait for
    ///
    /// When the `action` function runs, it informs the application about
    /// updates in the socket (file descriptor) status by doing none, one, or
    /// multiple calls to the socket callback. The callback gets status updates
    /// with changes since the previous time the callback was called. See
    /// `action` for more details on how the callback is used and should work.
    ///
    /// The `SocketEvents` parameter informs the callback on the status of the
    /// given socket, and the methods on that type can be used to learn about
    /// what's going on with the socket.
    ///
    /// The third `usize` parameter is a custom value set by the `assign` method
    /// below.
    pub fn socket_function<F>(&mut self, f: F) -> Result<(), MultiError>
        where F: FnMut(Socket, SocketEvents, usize) + Send + 'static,
    {
        self._socket_function(Box::new(f))
    }

    fn _socket_function(&mut self,
                        f: Box<FnMut(Socket, SocketEvents, usize) + Send>)
                        -> Result<(), MultiError>
    {
        self.data.socket = f;
        let cb: curl_sys::curl_socket_callback = cb;
        try!(self.setopt_ptr(curl_sys::CURLMOPT_SOCKETFUNCTION,
                             cb as usize as *const c_char));
        let ptr = &*self.data as *const _;
        try!(self.setopt_ptr(curl_sys::CURLMOPT_SOCKETDATA,
                             ptr as *const c_char));
        return Ok(());

        // TODO: figure out how to expose `_easy`
        extern fn cb(_easy: *mut curl_sys::CURL,
                     socket: curl_sys::curl_socket_t,
                     what: c_int,
                     userptr: *mut c_void,
                     socketp: *mut c_void) -> c_int {
            panic::catch(|| unsafe {
                let f = &mut (*(userptr as *mut MultiData)).socket;
                f(socket, SocketEvents { bits: what }, socketp as usize)
            });
            0
        }
    }

    /// Set data to associate with an internal socket
    ///
    /// This function creates an association in the multi handle between the
    /// given socket and a private token of the application. This is designed
    /// for `action` uses.
    ///
    /// When set, the token will be passed to all future socket callbacks for
    /// the specified socket.
    ///
    /// If the given socket isn't already in use by libcurl, this function will
    /// return an error.
    ///
    /// libcurl only keeps one single token associated with a socket, so
    /// calling this function several times for the same socket will make the
    /// last set token get used.
    ///
    /// The idea here being that this association (socket to token) is something
    /// that just about every application that uses this API will need and then
    /// libcurl can just as well do it since it already has an internal hash
    /// table lookup for this.
    ///
    /// # Typical Usage
    ///
    /// In a typical application you allocate a struct or at least use some kind
    /// of semi-dynamic data for each socket that we must wait for action on
    /// when using the `action` approach.
    ///
    /// When our socket-callback gets called by libcurl and we get to know about
    /// yet another socket to wait for, we can use `assign` to point out the
    /// particular data so that when we get updates about this same socket
    /// again, we don't have to find the struct associated with this socket by
    /// ourselves.
    pub fn assign(&self,
                  socket: Socket,
                  token: usize) -> Result<(), MultiError> {
        unsafe {
            try!(cvt(curl_sys::curl_multi_assign(self.raw, socket,
                                                 token as *mut _)));
            Ok(())
        }
    }

    /// Set callback to receive timeout values
    ///
    /// Certain features, such as timeouts and retries, require you to call
    /// libcurl even when there is no activity on the file descriptors.
    ///
    /// Your callback function should install a non-repeating timer with the
    /// interval specified. Each time that timer fires, call either `action` or
    /// `perform`, depending on which interface you use.
    ///
    /// A timeout value of `None` means you should delete your timer.
    ///
    /// A timeout value of 0 means you should call `action` or `perform` (once)
    /// as soon as possible.
    ///
    /// This callback will only be called when the timeout changes.
    ///
    /// The timer callback should return `true` on success, and `false` on
    /// error. This callback can be used instead of, or in addition to,
    /// `get_timeout`.
    pub fn timer_function<F>(&mut self, f: F) -> Result<(), MultiError>
        where F: FnMut(Option<Duration>) -> bool + Send + 'static,
    {
        self._timer_function(Box::new(f))
    }

    fn _timer_function(&mut self,
                        f: Box<FnMut(Option<Duration>) -> bool + Send>)
                        -> Result<(), MultiError>
    {
        self.data.timer = f;
        let cb: curl_sys::curl_multi_timer_callback = cb;
        try!(self.setopt_ptr(curl_sys::CURLMOPT_TIMERFUNCTION,
                             cb as usize as *const c_char));
        let ptr = &*self.data as *const _;
        try!(self.setopt_ptr(curl_sys::CURLMOPT_TIMERDATA,
                             ptr as *const c_char));
        return Ok(());

        // TODO: figure out how to expose `_multi`
        extern fn cb(_multi: *mut curl_sys::CURLM,
                     timeout_ms: c_long,
                     user: *mut c_void) -> c_int {
            let keep_going = panic::catch(|| unsafe {
                let f = &mut (*(user as *mut MultiData)).timer;
                if timeout_ms == -1 {
                    f(None)
                } else {
                    f(Some(Duration::from_millis(timeout_ms as u64)))
                }
            }).unwrap_or(false);
            if keep_going {0} else {-1}
        }
    }

    fn setopt_ptr(&mut self,
                  opt: curl_sys::CURLMoption,
                  val: *const c_char) -> Result<(), MultiError> {
        unsafe {
            cvt(curl_sys::curl_multi_setopt(self.raw, opt, val))
        }
    }

    /// Add an easy handle to a multi session
    ///
    /// Adds a standard easy handle to the multi stack. This function call will
    /// make this multi handle control the specified easy handle.
    ///
    /// When an easy interface is added to a multi handle, it will use a shared
    /// connection cache owned by the multi handle. Removing and adding new easy
    /// handles will not affect the pool of connections or the ability to do
    /// connection re-use.
    ///
    /// If you have `timer_function` set in the multi handle (and you really
    /// should if you're working event-based with `action` and friends), that
    /// callback will be called from within this function to ask for an updated
    /// timer so that your main event loop will get the activity on this handle
    /// to get started.
    ///
    /// The easy handle will remain added to the multi handle until you remove
    /// it again with `remove` on the returned handle - even when a transfer
    /// with that specific easy handle is completed.
    pub fn add(&self, easy: Easy) -> Result<EasyHandle, MultiError> {
        unsafe {
            try!(cvt(curl_sys::curl_multi_add_handle(self.raw, easy.raw())));
        }
        Ok(EasyHandle {
            multi: self,
            easy: Some(easy),
        })
    }

    /// Read multi stack informationals
    ///
    /// Ask the multi handle if there are any messages/informationals from the
    /// individual transfers. Messages may include informationals such as an
    /// error code from the transfer or just the fact that a transfer is
    /// completed. More details on these should be written down as well.
    pub fn messages<F>(&self, mut f: F) where F: FnMut(Message) {
        self._messages(&mut f)
    }

    fn _messages(&self, mut f: &mut FnMut(Message)) {
        let mut queue = 0;
        unsafe {
            loop {
                let ptr = curl_sys::curl_multi_info_read(self.raw, &mut queue);
                if ptr.is_null() {
                    break
                }
                f(Message { ptr: ptr, _multi: self })
            }
        }
    }

    /// Inform of reads/writes available data given an action
    ///
    /// When the application has detected action on a socket handled by libcurl,
    /// it should call this function with the sockfd argument set to
    /// the socket with the action. When the events on a socket are known, they
    /// can be passed `events`. When the events on a socket are unknown, pass
    /// `Events::new()` instead, and libcurl will test the descriptor
    /// internally.
    ///
    /// The returned integer will contain the number of running easy handles
    /// within the multi handle. When this number reaches zero, all transfers
    /// are complete/done. When you call `action` on a specific socket and the
    /// counter decreases by one, it DOES NOT necessarily mean that this exact
    /// socket/transfer is the one that completed. Use `messages` to figure out
    /// which easy handle that completed.
    ///
    /// The `action` function informs the application about updates in the
    /// socket (file descriptor) status by doing none, one, or multiple calls to
    /// the socket callback function set with the `socket_function` method. They
    /// update the status with changes since the previous time the callback was
    /// called.
    pub fn action(&self, socket: Socket, events: &Events)
                  -> Result<u32, MultiError> {
        let mut remaining = 0;
        unsafe {
            try!(cvt(curl_sys::curl_multi_socket_action(self.raw,
                                                        socket,
                                                        events.bits,
                                                        &mut remaining)));
            Ok(remaining as u32)
        }
    }

    /// Inform libcurl that a timeout has expired and sockets should be tested.
    ///
    /// The returned integer will contain the number of running easy handles
    /// within the multi handle. When this number reaches zero, all transfers
    /// are complete/done. When you call `action` on a specific socket and the
    /// counter decreases by one, it DOES NOT necessarily mean that this exact
    /// socket/transfer is the one that completed. Use `messages` to figure out
    /// which easy handle that completed.
    ///
    /// Get the timeout time by calling the `timer_function` method. Your
    /// application will then get called with information on how long to wait
    /// for socket actions at most before doing the timeout action: call the
    /// `timeout` method. You can also use the `get_timeout` function to
    /// poll the value at any given time, but for an event-based system using
    /// the callback is far better than relying on polling the timeout value.
    pub fn timeout(&self) -> Result<u32, MultiError> {
        let mut remaining = 0;
        unsafe {
            try!(cvt(curl_sys::curl_multi_socket_action(self.raw,
                                                        curl_sys::CURL_SOCKET_BAD,
                                                        0,
                                                        &mut remaining)));
            Ok(remaining as u32)
        }
    }

    /// Get how long to wait for action before proceeding
    ///
    /// An application using the libcurl multi interface should call
    /// `get_timeout` to figure out how long it should wait for socket actions -
    /// at most - before proceeding.
    ///
    /// Proceeding means either doing the socket-style timeout action: call the
    /// `timeout` function, or call `perform` if you're using the simpler and
    /// older multi interface approach.
    ///
    /// The timeout value returned is the duration at this very moment. If 0, it
    /// means you should proceed immediately without waiting for anything. If it
    /// returns `None`, there's no timeout at all set.
    ///
    /// Note: if libcurl returns a `None` timeout here, it just means that
    /// libcurl currently has no stored timeout value. You must not wait too
    /// long (more than a few seconds perhaps) before you call `perform` again.
    pub fn get_timeout(&self) -> Result<Option<Duration>, MultiError> {
        let mut ms = 0;
        unsafe {
            try!(cvt(curl_sys::curl_multi_timeout(self.raw, &mut ms)));
            if ms == -1 {
                Ok(None)
            } else {
                Ok(Some(Duration::from_millis(ms as u64)))
            }
        }
    }

    /// Reads/writes available data from each easy handle.
    ///
    /// This function handles transfers on all the added handles that need
    /// attention in an non-blocking fashion.
    ///
    /// When an application has found out there's data available for this handle
    /// or a timeout has elapsed, the application should call this function to
    /// read/write whatever there is to read or write right now etc.  This
    /// method returns as soon as the reads/writes are done. This function does
    /// not require that there actually is any data available for reading or
    /// that data can be written, it can be called just in case. It will return
    /// the number of handles that still transfer data.
    ///
    /// If the amount of running handles is changed from the previous call (or
    /// is less than the amount of easy handles you've added to the multi
    /// handle), you know that there is one or more transfers less "running".
    /// You can then call `info` to get information about each individual
    /// completed transfer, and that returned info includes `Error` and more.
    /// If an added handle fails very quickly, it may never be counted as a
    /// running handle.
    ///
    /// When running_handles is set to zero (0) on the return of this function,
    /// there is no longer any transfers in progress.
    ///
    /// # Return
    ///
    /// Before libcurl version 7.20.0: If you receive `is_call_perform`, this
    /// basically means that you should call `perform` again, before you select
    /// on more actions. You don't have to do it immediately, but the return
    /// code means that libcurl may have more data available to return or that
    /// there may be more data to send off before it is "satisfied". Do note
    /// that `perform` will return `is_call_perform` only when it wants to be
    /// called again immediately. When things are fine and there is nothing
    /// immediate it wants done, it'll return `Ok` and you need to wait for
    /// "action" and then call this function again.
    ///
    /// This function only returns errors etc regarding the whole multi stack.
    /// Problems still might have occurred on individual transfers even when
    /// this function returns `Ok`. Use `info` to figure out how individual
    /// transfers did.
    pub fn perform(&self) -> Result<u32, MultiError> {
        unsafe {
            let mut ret = 0;
            try!(cvt(curl_sys::curl_multi_perform(self.raw, &mut ret)));
            Ok(ret as u32)
        }
    }

    /// Attempt to close the multi handle and clean up all associated resources.
    ///
    /// Cleans up and removes a whole multi stack. It does not free or touch any
    /// individual easy handles in any way - they still need to be closed
    /// individually.
    pub fn close(&self) -> Result<(), MultiError> {
        unsafe {
            cvt(curl_sys::curl_multi_cleanup(self.raw))
        }
    }
}

fn cvt(code: curl_sys::CURLMcode) -> Result<(), MultiError> {
    if code == curl_sys::CURLM_OK {
        Ok(())
    } else {
        Err(MultiError::new(code))
    }
}

impl Drop for Multi {
    fn drop(&mut self) {
        let _ = self.close();
    }
}

impl<'m> EasyHandle<'m> {
    /// Femove this easy handle from a multi session
    ///
    /// Removes this easy handle from the multi handle. This will make the
    /// returned easy handle be removed from this multi handle's control.
    ///
    /// When the easy handle has been removed from a multi stack, it is again
    /// perfectly legal to invoke `perform` on this easy handle.
    ///
    /// Removing an easy handle while being used is perfectly legal and will
    /// effectively halt the transfer in progress involving that easy handle.
    /// All other easy handles and transfers will remain unaffected.
    pub fn remove(mut self) -> Result<Easy, MultiError> {
        try!(self._remove());
        Ok(self.easy.take().unwrap())
    }

    fn _remove(&self) -> Result<(), MultiError> {
        if let Some(easy) = self.easy.as_ref() {
            unsafe {
                try!(cvt(curl_sys::curl_multi_remove_handle(self.multi.raw,
                                                            easy.raw())));
            }
        }
        Ok(())
    }
}

impl<'m> Drop for EasyHandle<'m> {
    fn drop(&mut self) {
        let _ = self._remove();
    }
}

impl<'multi> Message<'multi> {
    /// If this message indicates that a transfer has finished, returns the
    /// result of the transfer in `Some`.
    ///
    /// If the message doesn't indicate that a transfer has finished, then
    /// `None` is returned.
    pub fn result(&self) -> Option<Result<(), MultiError>> {
        unsafe {
            if (*self.ptr).msg == curl_sys::CURLMSG_DONE {
                Some(cvt((*self.ptr).data as curl_sys::CURLMcode))
            } else {
                None
            }
        }
    }

    // TODO: expose the easy handle somehow...
}

impl Events {
    /// Creates a new blank event bit mask.
    pub fn new() -> Events {
        Events { bits: 0 }
    }

    /// Set or unset the whether these events indicate that input is ready.
    pub fn input(&mut self, val: bool) -> &mut Events {
        self.flag(curl_sys::CURL_CSELECT_IN, val)
    }

    /// Set or unset the whether these events indicate that output is ready.
    pub fn output(&mut self, val: bool) -> &mut Events {
        self.flag(curl_sys::CURL_CSELECT_OUT, val)
    }

    /// Set or unset the whether these events indicate that an error has
    /// happened.
    pub fn error(&mut self, val: bool) -> &mut Events {
        self.flag(curl_sys::CURL_CSELECT_ERR, val)
    }

    fn flag(&mut self, flag: c_int, val: bool) -> &mut Events {
        if val {
            self.bits |= flag;
        } else {
            self.bits &= !flag;
        }
        self
    }
}

impl SocketEvents {
    /// Wait for incoming data. For the socket to become readable.
    pub fn input(&self) -> bool {
        self.bits & curl_sys::CURL_POLL_IN == curl_sys::CURL_POLL_IN
    }

    /// Wait for outgoing data. For the socket to become writable.
    pub fn output(&self) -> bool {
        self.bits & curl_sys::CURL_POLL_OUT == curl_sys::CURL_POLL_OUT
    }

    /// Wait for incoming and outgoing data. For the socket to become readable
    /// or writable.
    pub fn input_and_output(&self) -> bool {
        self.bits & curl_sys::CURL_POLL_INOUT == curl_sys::CURL_POLL_INOUT
    }

    /// The specified socket/file descriptor is no longer used by libcurl.
    pub fn remove(&self) -> bool {
        self.bits & curl_sys::CURL_POLL_REMOVE == curl_sys::CURL_POLL_REMOVE
    }
}