Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ type NDTResult struct {
// ndt7
Upload *model.ArchivalData `json:",omitempty"`
Download *model.ArchivalData `json:",omitempty"`
Ping *model.ArchivalData `json:",omitempty"`
}
22 changes: 22 additions & 0 deletions html/ndt7-ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* jshint esversion: 6, asi: true, worker: true */
// WebWorker that runs the ndt7 ping test
onmessage = function (ev) {
'use strict'
let url = new URL(ev.data.href)
url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:'
url.pathname = '/ndt/v7/ping'
const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7')
sock.onclose = function () {
postMessage(null)
}
sock.onopen = function () {
sock.onmessage = function (ev) {
if (!(ev.data instanceof Blob)) {
let m = JSON.parse(ev.data)
m.Origin = 'server'
m.Test = 'ping'
postMessage(m)
}
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ignoring the JavaScript part for now.

25 changes: 24 additions & 1 deletion html/ndt7.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
</head>
<body>
<div>
<div id='ping' class='result row'>[Ping]</div>
<div id='download' class='result row'>[Download]</div>
<div id='upload' class='result row'>[Upload]</div>
<div id='done' class='result row'></div>
</div>
<script type='text/javascript'>
/* jshint esversion: 6, asi: true */
Expand All @@ -49,18 +51,35 @@
}

function runSomething(testName, callback) {
let ws = Number.NaN;
let tcp = Number.NaN;
ndt7core.run(location.href, testName, function(ev, val) {
console.log(ev, val)
if (ev === 'complete') {
if (callback !== undefined) {
callback()
} else {
withElementDo('done', function (elem) {
elem.innerHTML = 'Done.'
})
}
return
}
if (ev === 'measurement' && val.AppInfo !== undefined &&
val.Origin === 'client') {
updateView(testName, val.AppInfo)
}
if (ev === 'measurement' && val.Origin === 'server' && testName === 'ping') {
if (val.WSInfo !== undefined) {
ws = val.WSInfo.MinRTT / 1e3
}
if (val.TCPInfo !== undefined) {
tcp = val.TCPInfo.MinRTT / 1e3
}
withElementDo('ping', function (elem) {
elem.innerHTML = '⓻ ' + ws.toFixed(1) + ' / ⓸ ' + tcp.toFixed(1) + ' ms'
})
}
})
}

Expand All @@ -72,7 +91,11 @@
runSomething('upload', callback)
}

runDownload(function() { runUpload(); })
function runPing(callback) {
runSomething('ping', callback)
}

runPing(function() { runDownload(function() { runUpload(); }); })
</script>
</body>
</html>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ignoring the JavaScript part for now.

1 change: 1 addition & 0 deletions ndt-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func main() {
}
ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download))
ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload))
ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping))
ndt7Server := &http.Server{
Addr: *ndt7Addr,
Handler: logging.MakeAccessLogHandler(ndt7Mux),
Expand Down
11 changes: 7 additions & 4 deletions ndt7/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package download

import (
"context"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/ndt7/download/sender"
Expand All @@ -15,13 +16,15 @@ import (
// Do implements the download subtest. The ctx argument is the parent
// context for the subtest. The conn argument is the open WebSocket
// connection. The resultfp argument is the file where to save results. Both
// arguments are owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) {
// arguments are owned by the caller of this function. The start argument is
// the test start time used to calculate ElapsedTime and deadlines.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing in the start time is very useful to start ensuring the same zero is used everywhere.

// Implementation note: use child context so that, if we cannot save the
// results in the loop below, we terminate the goroutines early
wholectx, cancel := context.WithCancel(ctx)
defer cancel()
senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID))
receiverch := receiver.StartDownloadReceiver(wholectx, conn)
measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start)
receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch)
senderch := sender.Start(conn, measurerch, start, pongch)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by the fact that a new channel has been introduced. Isn't it possible to use the measurerch for passing around information? I understood that the PING is another nullable pointer within a Measurement.

saver.SaveAll(resultfp, senderch, receiverch)
}
32 changes: 26 additions & 6 deletions ndt7/download/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) {
return websocket.NewPreparedMessage(websocket.BinaryMessage, data)
}

func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) {
func loop(
conn *websocket.Conn, src <-chan model.Measurement,
dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo,
) {
logging.Logger.Debug("sender: start")
defer logging.Logger.Debug("sender: stop")
defer close(dst)
defer func() {
for range src {
// make sure we drain the channel
}
for range pongch {
// it should be buffered channel, but let's drain it anyway
}
}()
logging.Logger.Debug("sender: generating random buffer")
bulkMessageSize := 1 << 13
Expand All @@ -38,12 +44,17 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed")
return
}
deadline := time.Now().Add(spec.MaxRuntime)
deadline := start.Add(spec.MaxRuntime)
err = conn.SetWriteDeadline(deadline) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed")
return
}
// only the first RTT sample taken before flooding the conn is not affected by HOL
if err := ping.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
return
}
var totalSent int64
for {
select {
Expand All @@ -57,10 +68,19 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
return
}
dst <- m // Liveness: this is blocking
if err := ping.SendTicks(conn, deadline); err != nil {
if err := ping.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
return
}
case wsinfo := <-pongch:
m := model.Measurement{
WSInfo: &wsinfo,
}
if err := conn.WriteJSON(m); err != nil {
logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed")
return
}
dst <- m // Liveness: this is blocking write to log
default:
if err := conn.WritePreparedMessage(preparedMessage); err != nil {
logging.Logger.WithError(err).Warn(
Expand Down Expand Up @@ -99,9 +119,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
// Liveness guarantee: the sender will not be stuck sending for more then
// the MaxRuntime of the subtest, provided that the consumer will
// continue reading from the returned channel. This is enforced by
// setting the write deadline to Time.Now() + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement {
// setting the write deadline to |start| + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(conn, src, dst)
go loop(conn, src, dst, start, pongch)
return dst
}
20 changes: 14 additions & 6 deletions ndt7/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ func warnAndClose(writer http.ResponseWriter, message string) {
// testerFunc is the function implementing a subtest. The first argument
// is the subtest context. The second argument is the connected websocket. The
// third argument is the open file where to write results. This function does
// not own the second or the third argument.
type testerFunc = func(context.Context, *websocket.Conn, *results.File)
// not own the second or the third argument. The fourth argument is the base
// start time of the test.
type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time)

// downloadOrUpload implements both download and upload. The writer argument
// is the HTTP response writer. The request argument is the HTTP request
// that we received. The kind argument must be spec.SubtestDownload or
// spec.SubtestUpload. The tester is a function actually implementing the
// requested ndt7 subtest.
// that we received. The kind argument must be spec.SubtestDownload,
// spec.SubtestUpload, or SubtestPing. The tester is a function actually
// implementing the requested ndt7 subtest.
func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) {
logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets")
if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol {
Expand Down Expand Up @@ -106,6 +107,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
result.Download = resultfp.Data
} else if kind == spec.SubtestUpload {
result.Upload = resultfp.Data
} else if kind == spec.SubtestPing {
result.Ping = resultfp.Data
} else {
logging.Logger.Warn(string(kind) + ": data not saved")
}
Expand All @@ -114,7 +117,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
}
warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error")
}()
tester(request.Context(), conn, resultfp)
tester(request.Context(), conn, resultfp, result.StartTime)
}

// Download handles the download subtest.
Expand All @@ -126,3 +129,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) {
func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) {
h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do)
}

// Ping handles the ping subtest.
func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) {
h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do)
}
7 changes: 3 additions & 4 deletions ndt7/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat
}
}

func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) {
func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) {
logging.Logger.Debug("measurer: start")
defer logging.Logger.Debug("measurer: stop")
defer close(dst)
Expand All @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod
return
}
defer sockfp.Close()
start := time.Now()
connectionInfo := &model.ConnectionInfo{
Client: conn.RemoteAddr().String(),
Server: conn.LocalAddr().String(),
Expand Down Expand Up @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod
// a timeout of DefaultRuntime seconds, provided that the consumer
// continues reading from the returned channel.
func Start(
ctx context.Context, conn *websocket.Conn, UUID string,
ctx context.Context, conn *websocket.Conn, UUID string, start time.Time,
) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(ctx, conn, UUID, dst)
go loop(ctx, conn, UUID, dst, start)
return dst
}
1 change: 1 addition & 0 deletions ndt7/model/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Measurement struct {
ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"`
BBRInfo *BBRInfo `json:",omitempty"`
TCPInfo *TCPInfo `json:",omitempty"`
WSInfo *WSInfo `json:",omitempty"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like PingInfo to me.

}
10 changes: 10 additions & 0 deletions ndt7/model/wsinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package model

// WSInfo contains an application level (websocket) ping measurement data.
// It may be melded into AppInfo.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'd rather actually keep them separate

// FIXME: describe this structure is in the ndt7 specification.
type WSInfo struct {
ElapsedTime int64
LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample.
MinRTT int64
}
21 changes: 12 additions & 9 deletions ndt7/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,33 @@ package ping

import (
"encoding/json"
"errors"
"time"

"github.com/gorilla/websocket"
)

// SendTicks sends the current ticks as a ping message.
func SendTicks(conn *websocket.Conn, deadline time.Time) error {
// TODO(bassosimone): when we'll have a unique base time.Time reference for
// the whole test, we should use that, since UnixNano() is not monotonic.
ticks := int64(time.Now().UnixNano())
func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error {
var ticks int64 = time.Since(start).Nanoseconds()
data, err := json.Marshal(ticks)
if err == nil {
err = conn.WriteControl(websocket.PingMessage, data, deadline)
}
return err
}

func ParseTicks(s string) (d int64, err error) {
// TODO(bassosimone): when we'll have a unique base time.Time reference for
// the whole test, we should use that, since UnixNano() is not monotonic.
func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) {
elapsed = time.Since(start)
var prev int64
err = json.Unmarshal([]byte(s), &prev)
if err == nil {
d = (int64(time.Now().UnixNano()) - prev)
if err != nil {
return
}
if 0 <= prev && prev <= elapsed.Nanoseconds() {
d = time.Duration(elapsed.Nanoseconds() - prev)
} else {
err = errors.New("RTT is negative")
}
return
}
Loading