engine: resources: The http proxy should support streaming files

This adds basic support for streaming files directly from the download
server. This avoids clients timing out if they are blocked while first
waiting for a giant file to download.
This commit is contained in:
James Shubin
2024-03-13 16:56:49 -04:00
parent cf49d9f784
commit e0ace35525

View File

@@ -34,10 +34,12 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"mime"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -150,7 +152,7 @@ func (obj *HTTPProxyRes) getPath() string {
// serveHTTP is the real implementation of ServeHTTP, but with a more ergonomic // serveHTTP is the real implementation of ServeHTTP, but with a more ergonomic
// signature. // signature.
func (obj *HTTPProxyRes) serveHTTP(ctx context.Context, requestPath string) (http.HandlerFunc, error) { func (obj *HTTPProxyRes) serveHTTP(ctx context.Context, requestPath string) (handlerFuncError, error) {
// TODO: switch requestPath to use safepath.AbsPath instead of a string // TODO: switch requestPath to use safepath.AbsPath instead of a string
result, err := obj.pathParser.parse(requestPath) result, err := obj.pathParser.parse(requestPath)
@@ -180,12 +182,6 @@ func (obj *HTTPProxyRes) serveHTTP(ctx context.Context, requestPath string) (htt
proxyURL := result.proxyURL proxyURL := result.proxyURL
// XXX: consider streaming the download into both the client requesting
// it indirectly through this proxy, and also into the cache if we want
// to store the file. This lets us not OOM on large files. The downside
// is this is more complicated to do and we can't guarantee no partial
// files from our side. Gate this behind a flag. Worry about timeouts!
// FIXME: should we be using a different client? // FIXME: should we be using a different client?
client := http.DefaultClient client := http.DefaultClient
request, err := http.NewRequestWithContext(ctx, http.MethodGet, proxyURL, nil) // (*Request, error) request, err := http.NewRequestWithContext(ctx, http.MethodGet, proxyURL, nil) // (*Request, error)
@@ -195,74 +191,139 @@ func (obj *HTTPProxyRes) serveHTTP(ctx context.Context, requestPath string) (htt
// TODO: add a progress logf... // TODO: add a progress logf...
obj.init.Logf("get: %s", proxyURL) obj.init.Logf("get: %s", proxyURL)
response, err := client.Do(request) // (*Response, error)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK { // 200 return func(w http.ResponseWriter, req *http.Request) error {
return nil, &httpError{
//http.StatusText(http.StatusCode) // Tell the client right away, that we're working on things...
msg: fmt.Sprintf("bad status: %d", response.StatusCode), // TODO: Is this valuable to give us more time to block?
code: http.StatusNotFound, // 404 w.WriteHeader(http.StatusProcessing) // http 102, RFC 2518, 10.1
response, err := client.Do(request) // (*Response, error)
if err != nil {
return err
} }
} defer response.Body.Close() // free
// Determine the last-modified time if we can. if response.StatusCode != http.StatusOK { // 200
modtime := time.Now() return &httpError{
//http.StatusText(http.StatusCode)
key := http.CanonicalHeaderKey("Last-Modified") msg: fmt.Sprintf("bad status: %d", response.StatusCode),
if headers, exists := response.Header[key]; exists && len(headers) > 0 { // []string code: http.StatusNotFound, // 404
text := headers[len(headers)-1] // last element }
lastModified, err := http.ParseTime(text) // (time.Time, error)
if err == nil {
modtime = lastModified
} }
}
// response.Body is an io.ReadCloser // Determine the last-modified time if we can.
data, err := io.ReadAll(response.Body) modtime := time.Now()
response.Body.Close() // free
if err != nil {
return nil, err
}
// TODO: consider doing something like this to stream
//reader := response.Body
if obj.Cache != "" { // check in the cache... key := http.CanonicalHeaderKey("Last-Modified")
httpProxyRWMutex.Lock() if headers, exists := response.Header[key]; exists && len(headers) > 0 { // []string
defer httpProxyRWMutex.Unlock() text := headers[len(headers)-1] // last element
// TODO: consider doing something like this to stream lastModified, err := http.ParseTime(text) // (time.Time, error)
//reader = io.TeeReader(reader, writer) if err == nil {
modtime = lastModified
// store in cachePath }
if err := os.MkdirAll(filepath.Dir(cachePath), 0700); err != nil {
return nil, err
} }
// TODO: use ctx
if err := os.WriteFile(cachePath, data, 0600); err != nil {
return nil, err
}
// store the last modified time if set
// TODO: is there a file last-modified-time precision issue if
// we use this value in a future HTTP If-Modified-Since header?
if err := os.Chtimes(cachePath, time.Time{}, modtime); err != nil {
return nil, err
}
}
handle := bytes.NewReader(data) // We stream the download into both the client requesting it
// indirectly through this proxy, and also into the cache if we
// want to store the file. This lets us not OOM on large files.
// This is necessary because otherwise a client would get bored
// of waiting for us to download large files before sending them
// off. One downside is we haven't implemented range requests
// that http.ServeContent did for us magically, and we also
// currently might not do all of the http/header magic it did.
var copyError error
writers := []io.Writer{w} // out to the client
if obj.Cache != "" { // check in the cache...
httpProxyRWMutex.Lock()
defer httpProxyRWMutex.Unlock()
// store in cachePath
if err := os.MkdirAll(filepath.Dir(cachePath), 0700); err != nil {
return err
}
// store the last modified time if set
// TODO: is there a file last-modified-time precision
// issue if we use this value in a future HTTP
// If-Modified-Since header?
defer func() { // Do this after the file closes
if copyError != nil {
return // skip if we have a copy error
}
if err := os.Chtimes(cachePath, time.Time{}, modtime); err != nil {
// TODO: what do we do here?
obj.init.Logf("could not chtimes: %s", cachePath)
}
}()
// TODO: use ctx
out, err := os.Create(cachePath)
if err != nil {
return err
}
defer out.Close()
writers = append(writers, out)
// TODO: consider doing something like this to stream?
//reader = io.TeeReader(reader, writer)
}
return func(w http.ResponseWriter, req *http.Request) {
requestPath := req.URL.Path // TODO: is this what we want here? requestPath := req.URL.Path // TODO: is this what we want here?
http.ServeContent(w, req, requestPath, modtime, handle)
if !isZeroTime(modtime) { // from: src/net/http/fs.go:setLastModified
w.Header().Set("Last-Modified", modtime.UTC().Format(http.TimeFormat))
}
if obj.init.Debug || true {
obj.init.Logf("got content-length: %d bytes", response.ContentLength)
}
if size := response.ContentLength; size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
// Use the file's extension to find the ctype, skip if we would
// need to seek into the file to read some header content bytes.
// TODO: Is this sufficient for setting Content-Type ?
if ctype := mime.TypeByExtension(filepath.Ext(requestPath)); ctype != "" {
w.Header().Set("Content-Type", ctype)
}
w.WriteHeader(http.StatusOK) // Tell client everything is ok.
//if progressBar {
// writers = append(writers, progress)
//}
writer := io.MultiWriter(writers...)
if len(writers) == 1 {
writer = writers[0] // simplify
}
if _, err := io.Copy(writer, response.Body); err != nil {
if obj.Cache != "" { // check in the cache...
// We already took the mutex earlier!
obj.init.Logf("removing a partial file: %s", cachePath)
if err := os.Remove(cachePath); err != nil {
// TODO: what do we do here?
obj.init.Logf("could not remove: %s", cachePath)
}
}
// Even if we have an error, it's too late to error.
//return err // superfluous response.WriteHeader call
copyError = err // store for defer
return nil
}
//obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked? //obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked?
return nil
}, nil }, nil
} }
// getCachedFile pulls a file from our local cache if it exists. It returns the // getCachedFile pulls a file from our local cache if it exists. It returns the
// correct http handler on success, which we can then run. // correct http handler on success, which we can then run.
func (obj *HTTPProxyRes) getCachedFile(ctx context.Context, absPath string) (http.HandlerFunc, error) { func (obj *HTTPProxyRes) getCachedFile(ctx context.Context, absPath string) (handlerFuncError, error) {
// TODO: if infinite reads keep coming in, do we indefinitely-postpone // TODO: if infinite reads keep coming in, do we indefinitely-postpone
// the locking so that a new file can be saved in the cache? // the locking so that a new file can be saved in the cache?
httpProxyRWMutex.RLock() httpProxyRWMutex.RLock()
@@ -289,10 +350,11 @@ func (obj *HTTPProxyRes) getCachedFile(ctx context.Context, absPath string) (htt
handle := bytes.NewReader(data) // buffer for mutex handle := bytes.NewReader(data) // buffer for mutex
return func(w http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) error {
requestPath := req.URL.Path // TODO: is this what we want here? requestPath := req.URL.Path // TODO: is this what we want here?
http.ServeContent(w, req, requestPath, modtime, handle) http.ServeContent(w, req, requestPath, modtime, handle)
//obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked? //obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked?
return nil
}, nil }, nil
} }
@@ -348,7 +410,11 @@ func (obj *HTTPProxyRes) ServeHTTP(w http.ResponseWriter, req *http.Request) {
sendHTTPError(w, err) sendHTTPError(w, err)
return return
} }
fn(w, req) if err := fn(w, req); err != nil {
obj.init.Logf("error: %s", err)
sendHTTPError(w, err)
return
}
//obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked? //obj.init.Logf("%d bytes sent", n) // XXX: how do we know (on the server-side) if it worked?
} }
@@ -561,3 +627,14 @@ func (obj *pathParser) parse(requestPath string) (*pathResult, error) {
return result, nil return result, nil
} }
// handlerFuncError is http.HandlerFunc, but with a more ergonomic signature.
type handlerFuncError func(http.ResponseWriter, *http.Request) error
// isZeroTime reports whether t is obviously unspecified (either zero or
// Unix()=0). Adapted from: src/net/http/fs.go
func isZeroTime(t time.Time) bool {
var unixEpochTime = time.Unix(0, 0)
return t.IsZero() || t.Equal(unixEpochTime)
}