From 35638cb82f76871224343584c6911df61e0c4e6b Mon Sep 17 00:00:00 2001 From: Jose Garcia Date: Mon, 28 Feb 2022 08:22:09 -0500 Subject: [PATCH] Update tests for serverSharing --- pkg/liveshare/port_forwarder_test.go | 16 ++++++++++++++-- pkg/liveshare/ports.go | 4 ++-- pkg/liveshare/rpc.go | 4 ++-- pkg/liveshare/session_test.go | 23 +++++++++++++++++++++-- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/pkg/liveshare/port_forwarder_test.go b/pkg/liveshare/port_forwarder_test.go index 0923847e0..f68c11aba 100644 --- a/pkg/liveshare/port_forwarder_test.go +++ b/pkg/liveshare/port_forwarder_test.go @@ -28,7 +28,13 @@ func TestNewPortForwarder(t *testing.T) { func TestPortForwarderStart(t *testing.T) { streamName, streamCondition := "stream-name", "stream-condition" + port := 8000 + sendNotification := make(chan PortUpdate) serverSharing := func(req *jsonrpc2.Request) (interface{}, error) { + sendNotification <- PortUpdate{ + Port: int(port), + ChangeKind: PortChangeKindStart, + } return Port{StreamName: streamName, StreamCondition: streamCondition}, nil } getStream := func(req *jsonrpc2.Request) (interface{}, error) { @@ -55,10 +61,16 @@ func TestPortForwarderStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + go func() { + testServer.WriteToObjectStream(rpcPortTestMessage{ + Method: "serverSharing.sharingSucceeded", + Params: <-sendNotification, + }) + }() + done := make(chan error) go func() { - const name, remote = "ssh", 8000 - done <- NewPortForwarder(session, name, remote, false).ForwardToListener(ctx, listen) + done <- NewPortForwarder(session, "ssh", port, false).ForwardToListener(ctx, listen) }() go func() { diff --git a/pkg/liveshare/ports.go b/pkg/liveshare/ports.go index 340a74554..9f9bd36f1 100644 --- a/pkg/liveshare/ports.go +++ b/pkg/liveshare/ports.go @@ -85,8 +85,8 @@ func (s *Session) WaitForPortNotification(ctx context.Context, port int, notifTy notificationUpdate := make(chan PortNotification, 1) errc := make(chan error, 1) - h := func(success bool) func(*jsonrpc2.Request) { - return func(req *jsonrpc2.Request) { + h := func(success bool) func(*jsonrpc2.Conn, *jsonrpc2.Request) { + return func(conn *jsonrpc2.Conn, req *jsonrpc2.Request) { var notification PortNotification if err := json.Unmarshal(*req.Params, ¬ification); err != nil { errc <- fmt.Errorf("error unmarshaling notification: %w", err) diff --git a/pkg/liveshare/rpc.go b/pkg/liveshare/rpc.go index ee0c196c4..a32d8507a 100644 --- a/pkg/liveshare/rpc.go +++ b/pkg/liveshare/rpc.go @@ -42,7 +42,7 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac return waiter.Wait(waitCtx, result) } -type handlerFn func(req *jsonrpc2.Request) +type handlerFn func(conn *jsonrpc2.Conn, req *jsonrpc2.Request) type requestHandler struct { handlersMu sync.RWMutex @@ -73,6 +73,6 @@ func (r *requestHandler) handlerFn(requestType string) []handlerFn { func (r *requestHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { for _, handler := range r.handlerFn(req.Method) { - go handler(req) + go handler(conn, req) } } diff --git a/pkg/liveshare/session_test.go b/pkg/liveshare/session_test.go index 44cb2357b..cfdfa5815 100644 --- a/pkg/liveshare/session_test.go +++ b/pkg/liveshare/session_test.go @@ -49,8 +49,14 @@ func makeMockSession(opts ...livesharetest.ServerOption) (*livesharetest.Server, return testServer, session, nil } +type rpcPortTestMessage struct { + Method string + Params PortUpdate +} + func TestServerStartSharing(t *testing.T) { serverPort, serverProtocol := 2222, "sshd" + sendNotification := make(chan PortUpdate) startSharing := func(req *jsonrpc2.Request) (interface{}, error) { var args []interface{} if err := json.Unmarshal(*req.Params, &args); err != nil { @@ -59,9 +65,11 @@ func TestServerStartSharing(t *testing.T) { if len(args) < 3 { return nil, errors.New("not enough arguments to start sharing") } - if port, ok := args[0].(float64); !ok { + port, ok := args[0].(float64) + if !ok { return nil, errors.New("port argument is not an int") - } else if port != float64(serverPort) { + } + if port != float64(serverPort) { return nil, errors.New("port does not match serverPort") } if protocol, ok := args[1].(string); !ok { @@ -74,6 +82,10 @@ func TestServerStartSharing(t *testing.T) { } else if browseURL != fmt.Sprintf("http://localhost:%d", serverPort) { return nil, errors.New("browseURL does not match expected") } + sendNotification <- PortUpdate{ + Port: int(port), + ChangeKind: PortChangeKindStart, + } return Port{StreamName: "stream-name", StreamCondition: "stream-condition"}, nil } testServer, session, err := makeMockSession( @@ -86,6 +98,13 @@ func TestServerStartSharing(t *testing.T) { } ctx := context.Background() + go func() { + testServer.WriteToObjectStream(rpcPortTestMessage{ + Method: "serverSharing.sharingSucceeded", + Params: <-sendNotification, + }) + }() + done := make(chan error) go func() { streamID, err := session.startSharing(ctx, serverProtocol, serverPort)