Estimate http connection latency and bandwidth.
authorTimothy B. Terriberry <tterribe@xiph.org>
Sun, 23 Sep 2012 06:12:41 +0000 (23:12 -0700)
committerTimothy B. Terriberry <tterribe@xiph.org>
Sun, 23 Sep 2012 06:12:41 +0000 (23:12 -0700)
This gives us a better idea when to re-use a connection.

src/http.c
src/opusfile.c

index 7638970..020117a 100644 (file)
@@ -159,10 +159,10 @@ static const char *op_parse_file_url(const char *_src){
 #if defined(OP_ENABLE_HTTP)
 # include <sys/types.h>
 # include <sys/socket.h>
-# include <sys/time.h>
+# include <sys/timeb.h>
 # include <arpa/inet.h>
+# include <netient/in.h>
 # include <fcntl.h>
-# include <netinet/in.h>
 # include <netdb.h>
 # include <poll.h>
 # include <unistd.h>
@@ -443,8 +443,17 @@ static int op_sb_append_nonnegative_int64(OpusStringBuf *_sb,opus_int64 _i){
 struct OpusHTTPConn{
   /*The current position indicator for this connection.*/
   opus_int64    pos;
+  /*The SSL connection, if this is https.*/
   SSL          *ssl_conn;
+  /*The next connection in either the LRU or free list.*/
   OpusHTTPConn *next;
+  /*The last time we blocked for reading from this connection.*/
+  struct timeb  read_time;
+  /*The number of bytes we've read since the last time we blocked.*/
+  opus_int64    read_bytes;
+  /*The estimated throughput of this connection, in bytes/s.*/
+  opus_int64    read_rate;
+  /*The socket we're reading from.*/
   int           fd;
 };
 
@@ -482,6 +491,8 @@ struct OpusHTTPStream{
   /*The connection we're currently reading from.
     This can be -1 if no connection is active.*/
   int              cur_conni;
+  /*The estimated time required to open a new connection, in milliseconds.*/
+  opus_int32       connect_rate;
   /*Information about the address we connected to.*/
   struct addrinfo  addr_info;
   /*The address we connected to.*/
@@ -609,7 +620,7 @@ static int op_do_ssl_step(SSL *_ssl_conn,int _fd,op_ssl_step_func _step){
 # define OP_NPROTOS (2)
 
 static int op_http_connect(OpusHTTPStream *_stream,OpusHTTPConn *_conn,
- struct addrinfo *_addrs){
+ struct addrinfo *_addrs,struct timeb *_start_time){
   struct addrinfo *addr;
   struct addrinfo *addrs[OP_NPROTOS];
   struct pollfd    fds[OP_NPROTOS];
@@ -640,6 +651,11 @@ static int op_http_connect(OpusHTTPStream *_stream,OpusHTTPConn *_conn,
       nprotos++;
     }
   }
+  ret=ftime(_start_time);
+  OP_ASSERT(!ret);
+  *&_conn->read_time=*_start_time;
+  _conn->read_bytes=0;
+  _conn->read_rate=0;
   /*Try to start a connection to each protocol.*/
   for(pi=0;pi<nprotos;pi++){
     ai_family=addrs[pi]->ai_family;
@@ -866,6 +882,36 @@ static int op_http_conn_write_fully(OpusHTTPConn *_conn,
   return 0;
 }
 
+static opus_int32 op_time_diff_ms(const struct timeb *_end,
+ const struct timeb *_start){
+  opus_int64 dtime;
+  dtime=_end->time-_start->time;
+  OP_ASSERT(_end->millitm<1000);
+  OP_ASSERT(_start->millitm<1000);
+  if(OP_UNLIKELY(dtime>(0x7FFFFFFF-1000)/1000))return 0x7FFFFFFF;
+  if(OP_UNLIKELY(dtime<(-0x7FFFFFFF+999)/1000))return -0x7FFFFFFF-1;
+  return (opus_int32)dtime*1000+_end->millitm-_start->millitm;
+}
+
+/*Update the read rate for this connection.*/
+static void op_http_conn_read_rate_update(OpusHTTPConn *_conn){
+  struct timeb read_time;
+  opus_int32   read_delta_ms;
+  opus_int64   read_delta_bytes;
+  opus_int64   read_rate;
+  int          ret;
+  ret=ftime(&read_time);
+  OP_ASSERT(!ret);
+  read_delta_ms=op_time_diff_ms(&read_time,&_conn->read_time);
+  read_delta_bytes=_conn->read_bytes;
+  read_rate=_conn->read_rate;
+  read_delta_ms=OP_MAX(read_delta_ms,1);
+  read_rate+=read_delta_bytes*1000/read_delta_ms-read_rate+4>>3;
+  *&_conn->read_time=*&read_time;
+  _conn->read_bytes=0;
+  _conn->read_rate=read_rate;
+}
+
 /*Tries to read from the given connection.
   [out] _buf: Returns the data read.
   _size:      The size of the buffer.
@@ -875,9 +921,10 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
   struct pollfd   fd;
   SSL            *ssl_conn;
   ptrdiff_t       nread;
+  ptrdiff_t       nread_unblocked;
   fd.fd=_conn->fd;
   ssl_conn=_conn->ssl_conn;
-  nread=0;
+  nread=nread_unblocked=0;
   do{
     int err;
     if(ssl_conn!=NULL){
@@ -887,6 +934,7 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
         /*Read some data.
           Keep going to see if there's more.*/
         nread+=ret;
+        nread_unblocked+=ret;
         continue;
       }
       /*Connection closed.*/
@@ -907,6 +955,7 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
         /*Read some data.
           Keep going to see if there's more.*/
         nread+=ret;
+        nread_unblocked+=ret;
         continue;
       }
       /*If we already read some data, return it right now.*/
@@ -915,11 +964,15 @@ static ptrdiff_t op_http_conn_read(OpusHTTPConn *_conn,
       if(err!=EAGAIN&&err!=EWOULDBLOCK)return 0;
       fd.events=POLLIN;
     }
+    _conn->read_bytes+=nread_unblocked;
+    op_http_conn_read_rate_update(_conn);
+    nread_unblocked=0;
     if(!_block)break;
     /*Need to wait to get any data at all.*/
     if(poll(&fd,1,-1)==-1)return 0;
   }
   while(nread<_size);
+  _conn->read_bytes+=nread_unblocked;
   return nread;
 }
 
@@ -1169,11 +1222,13 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
   ret=op_parse_url(&_stream->url,_url);
   if(OP_UNLIKELY(ret<0))return ret;
   for(nredirs=0;nredirs<OP_REDIRECT_LIMIT;nredirs++){
-    char        response[OP_RESPONSE_SIZE_MAX];
-    char       *next;
-    char       *status_code;
-    const char *host;
-    unsigned    port;
+    struct timeb  start_time;
+    struct timeb  end_time;
+    char          response[OP_RESPONSE_SIZE_MAX];
+    char         *next;
+    char         *status_code;
+    const char   *host;
+    unsigned      port;
     if(_proxy_host==NULL){
       host=_stream->url.host;
       port=_stream->url.port;
@@ -1217,7 +1272,7 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
       addrs=op_resolve(host,port);
       if(OP_UNLIKELY(addrs==NULL))return OP_FALSE;
     }
-    ret=op_http_connect(_stream,_stream->conns+0,addrs);
+    ret=op_http_connect(_stream,_stream->conns+0,addrs,&start_time);
     if(addrs!=&_stream->addr_info)freeaddrinfo(addrs);
     if(OP_UNLIKELY(ret<0))return ret;
     /*Build the request to send.*/
@@ -1269,6 +1324,8 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
     ret=op_http_conn_read_response(_stream->conns+0,
      response,sizeof(response)/sizeof(*response));
     if(OP_UNLIKELY(ret<0))return ret;
+    ret=ftime(&end_time);
+    OP_ASSERT(!ret);
     next=op_http_parse_status_line(&status_code,response);
     if(next==NULL)return OP_FALSE;
     if(status_code[0]=='2'){
@@ -1358,6 +1415,8 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
       _stream->content_length=content_length;
       _stream->conns[0].pos=0;
       _stream->cur_conni=0;
+      _stream->connect_rate=op_time_diff_ms(&end_time,&start_time);
+      _stream->connect_rate=OP_MAX(_stream->connect_rate,1);
       /*The URL has been successfully opened.*/
       return 0;
     }
@@ -1415,12 +1474,16 @@ static int op_http_stream_open(OpusHTTPStream *_stream,const char *_url,
 
 static int op_http_conn_open_pos(OpusHTTPStream *_stream,
  OpusHTTPConn *_conn,opus_int64 _pos){
-  char        response[OP_RESPONSE_SIZE_MAX];
-  char       *next;
-  char       *status_code;
-  opus_int64  range_length;
-  int         ret;
-  ret=op_http_connect(_stream,_conn,&_stream->addr_info);
+  struct timeb  start_time;
+  struct timeb  end_time;
+  char          response[OP_RESPONSE_SIZE_MAX];
+  char         *next;
+  char         *status_code;
+  opus_int64    range_length;
+  opus_int32    connect_rate;
+  opus_int32    connect_time;
+  int           ret;
+  ret=op_http_connect(_stream,_conn,&_stream->addr_info,&start_time);
   if(OP_UNLIKELY(ret<0))return ret;
   /*Build the request to send.*/
   _stream->request.nbuf=_stream->request_tail;
@@ -1433,6 +1496,8 @@ static int op_http_conn_open_pos(OpusHTTPStream *_stream,
   ret=op_http_conn_read_response(_conn,
    response,sizeof(response)/sizeof(*response));
   if(OP_UNLIKELY(ret<0))return ret;
+  ret=ftime(&end_time);
+  OP_ASSERT(!ret);
   next=op_http_parse_status_line(&status_code,response);
   if(next==NULL)return OP_FALSE;
   /*We _need_ a 206 Partial Content response.*/
@@ -1474,7 +1539,12 @@ static int op_http_conn_open_pos(OpusHTTPStream *_stream,
   _conn->pos=_pos;
   _stream->cur_conni=_conn-_stream->conns;
   OP_ASSERT(_stream->cur_conni>=0&&_stream->cur_conni<OP_NCONNS_MAX);
-  /*The connection has been successfully opened.*/
+  /*The connection has been successfully opened.
+    Update the connection time estimate.*/
+  connect_time=op_time_diff_ms(&end_time,&start_time);
+  connect_rate=_stream->connect_rate;
+  connect_rate+=OP_MAX(connect_time,1)-connect_rate+8>>4;
+  _stream->connect_rate=connect_rate;
   return 0;
 }
 
@@ -1542,12 +1612,12 @@ static size_t op_http_stream_read(void *_ptr,size_t _size,size_t _nmemb,
   return nread;
 }
 
-/*To this will need to be larger than OP_CHUNK_SIZE to be useful.*/
-# define OP_READAHEAD_THRESH (128*1024)
+# define OP_READAHEAD_THRESH_MIN (64*1024)
 /*16 kB is the largest size OpenSSL will return at once.*/
 # define OP_READAHEAD_CHUNK_SIZE (16*1024)
 
 static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
+  struct timeb     seek_time;
   OpusHTTPStream  *stream;
   OpusHTTPConn    *conn;
   OpusHTTPConn    *prev;
@@ -1582,6 +1652,15 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
     }break;
     default:return -1;
   }
+  /*Mark when we deactivated the active connection.*/
+  if(ci>=0){
+    op_http_conn_read_rate_update(stream->conns+ci);
+    *&seek_time=*&stream->conns[ci].read_time;
+  }
+  else{
+    ret=ftime(&seek_time);
+    OP_ASSERT(!ret);
+  }
   /*If we seeked past the end of the stream, just disable the active
      connection.*/
   if(pos>=content_length){
@@ -1595,11 +1674,24 @@ static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
   conn=stream->lru_head;
   while(conn!=NULL){
     opus_int64 conn_pos;
-    /*TODO: Estimate connection open time and current throughput, and compute
-       the read-ahead threshold accordingly.*/
-    /*TODO: Expire connections aggressively to avoid server timeouts.*/
+    opus_int64 read_ahead_thresh;
+    /*If this connection has been dormant too long, close it.
+      This is to prevent us from hitting server/firewall timeouts.*/
+    if(op_time_diff_ms(&seek_time,&conn->read_time)>5*1000){
+      *pnext=conn->next;
+      conn->next=stream->lru_head;
+      stream->lru_head=conn;
+      op_http_conn_close(stream,conn);
+      conn=*pnext;
+      continue;
+    }
+    /*Dividing by 512 instead of 1000 scales this by nearly 2, biasing towards
+       connection re-use (and roughly compensating for the ability of the TCP
+       window to open up on long reads).*/
+    read_ahead_thresh=OP_MAX(OP_READAHEAD_THRESH_MIN,
+     stream->connect_rate*conn->read_rate>>9);
     conn_pos=conn->pos;
-    if(pos-OP_READAHEAD_THRESH<=conn_pos&&conn_pos<=pos){
+    if(pos-read_ahead_thresh<=conn_pos&&conn_pos<=pos){
       /*Found a suitable connection to re-use.*/
       *pnext=conn->next;
       conn->next=stream->lru_head;
index 7cbf31a..f829a76 100644 (file)
@@ -2008,7 +2008,7 @@ static int op_pcm_seek_page_impl(OggOpusFile *_of,
       OP_ASSERT(!ret);
       /*Take a (pretty decent) guess.*/
       bisect=begin+op_rescale64(diff,diff2,end-begin)-OP_CHUNK_SIZE;
-      if(bisect<begin+OP_CHUNK_SIZE)bisect=begin;
+      if(bisect-OP_CHUNK_SIZE<begin)bisect=begin;
     }
     ret=op_seek_helper(_of,bisect);
     if(OP_UNLIKELY(ret<0))return ret;