diff --git a/pkg/cmd/codespace/ports.go b/pkg/cmd/codespace/ports.go index 76ad97d74..a717c339d 100644 --- a/pkg/cmd/codespace/ports.go +++ b/pkg/cmd/codespace/ports.go @@ -18,6 +18,7 @@ import ( "github.com/cli/cli/v2/pkg/liveshare" "github.com/cli/cli/v2/utils" "github.com/muhammadmuzzammil1998/jsonc" + "github.com/sourcegraph/jsonrpc2" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -273,8 +274,19 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar } defer safeClose(session, &err) - success := session.RegisterEvent("serverSharing.sharingSucceeded") - failure := session.RegisterEvent("serverSharing.sharingFailed") + notificationUpdate := make(chan portUpdateNotification) + h := func(success bool) func(*jsonrpc2.Request) { + return func(req *jsonrpc2.Request) { + var notification portUpdateNotification + if err := json.Unmarshal(*req.Params, ¬ification); err != nil { + return + } + notification.success = success + notificationUpdate <- notification + } + } + session.RegisterRequestHandler("serverSharing.sharingSucceeded", h(true)) + session.RegisterRequestHandler("serverSharing.sharingFailed", h(false)) // TODO: check if port visibility can be updated in parallel instead of sequentially for _, port := range ports { @@ -289,7 +301,7 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - if err := a.waitForPortUpdate(ctx, success, failure, session, port.number); err != nil { + if err := a.waitForPortUpdate(ctx, notificationUpdate, session, port.number); err != nil { return newErrUpdatingPortVisibility(port.number, port.visibility, err) } @@ -299,43 +311,30 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar return nil } -type portChangeKind string - -const ( - portChangeKindUpdate portChangeKind = "update" -) - -type portData struct { - Port int `json:"port"` - ChangeKind portChangeKind `json:"changeKind"` - ErrorDetail string `json:"errorDetail"` - StatusCode int `json:"statusCode"` +type portUpdateNotification struct { + liveshare.PortUpdate + success bool } var errUpdatePortVisibilityForbidden = errors.New("organization admin has forbidden this privacy setting") -func (a *App) waitForPortUpdate(ctx context.Context, success, failure chan []byte, session *liveshare.Session, port int) error { +func (a *App) waitForPortUpdate(ctx context.Context, n chan portUpdateNotification, session *liveshare.Session, port int) error { for { - var pd portData select { case <-ctx.Done(): return fmt.Errorf("timeout waiting for server sharing to succeed or fail") - case b := <-success: - if err := json.Unmarshal(b, &pd); err != nil { - return fmt.Errorf("error unmarshaling port data: %w", err) - } - if pd.Port == port && pd.ChangeKind == portChangeKindUpdate { - return nil - } - case b := <-failure: - if err := json.Unmarshal(b, &pd); err != nil { - return fmt.Errorf("error unmarshaling port data: %w", err) - } - if pd.Port == port && pd.ChangeKind == portChangeKindUpdate { - if pd.StatusCode == http.StatusForbidden { - return errUpdatePortVisibilityForbidden + case update := <-n: + if update.success { + if update.Port == port && update.ChangeKind == liveshare.PortChangeKindUpdate { + return nil + } + } else { + if update.Port == port && update.ChangeKind == liveshare.PortChangeKindUpdate { + if update.StatusCode == http.StatusForbidden { + return errUpdatePortVisibilityForbidden + } + return errors.New(update.ErrorDetail) } - return errors.New(pd.ErrorDetail) } } } diff --git a/pkg/cmd/codespace/ports_test.go b/pkg/cmd/codespace/ports_test.go index 9f4a3f839..e41e6feed 100644 --- a/pkg/cmd/codespace/ports_test.go +++ b/pkg/cmd/codespace/ports_test.go @@ -9,6 +9,7 @@ import ( "github.com/cli/cli/v2/internal/codespaces/api" "github.com/cli/cli/v2/pkg/iostreams" + "github.com/cli/cli/v2/pkg/liveshare" livesharetest "github.com/cli/cli/v2/pkg/liveshare/test" "github.com/sourcegraph/jsonrpc2" ) @@ -26,18 +27,24 @@ func TestPortsUpdateVisibilitySuccess(t *testing.T) { } eventResponses := []string{ - "sharingSucceeded", - "sharingSucceeded", + "serverSharing.sharingSucceeded", + "serverSharing.sharingSucceeded", } - portsData := []portData{ + portsData := []portUpdateNotification{ { - Port: 80, - ChangeKind: portChangeKindUpdate, + success: true, + PortUpdate: liveshare.PortUpdate{ + Port: 80, + ChangeKind: liveshare.PortChangeKindUpdate, + }, }, { - Port: 9999, - ChangeKind: portChangeKindUpdate, + success: true, + PortUpdate: liveshare.PortUpdate{ + Port: 9999, + ChangeKind: liveshare.PortChangeKindUpdate, + }, }, } @@ -61,20 +68,26 @@ func TestPortsUpdateVisibilityFailure403(t *testing.T) { } eventResponses := []string{ - "sharingSucceeded", - "sharingFailed", + "serverSharing.sharingSucceeded", + "serverSharing.sharingFailed", } - portsData := []portData{ + portsData := []portUpdateNotification{ { - Port: 80, - ChangeKind: portChangeKindUpdate, + success: true, + PortUpdate: liveshare.PortUpdate{ + Port: 80, + ChangeKind: liveshare.PortChangeKindUpdate, + }, }, { - Port: 9999, - ChangeKind: portChangeKindUpdate, - ErrorDetail: "test error", - StatusCode: 403, + success: false, + PortUpdate: liveshare.PortUpdate{ + Port: 9999, + ChangeKind: liveshare.PortChangeKindUpdate, + ErrorDetail: "test error", + StatusCode: 403, + }, }, } @@ -101,19 +114,25 @@ func TestPortsUpdateVisibilityFailure(t *testing.T) { } eventResponses := []string{ - "sharingSucceeded", - "sharingFailed", + "serverSharing.sharingSucceeded", + "serverSharing.sharingFailed", } - portsData := []portData{ + portsData := []portUpdateNotification{ { - Port: 80, - ChangeKind: portChangeKindUpdate, + success: true, + PortUpdate: liveshare.PortUpdate{ + Port: 80, + ChangeKind: liveshare.PortChangeKindUpdate, + }, }, { - Port: 9999, - ChangeKind: portChangeKindUpdate, - ErrorDetail: "test error", + success: false, + PortUpdate: liveshare.PortUpdate{ + Port: 9999, + ChangeKind: liveshare.PortChangeKindUpdate, + ErrorDetail: "test error", + }, }, } @@ -132,7 +151,7 @@ type joinWorkspaceResult struct { SessionNumber int `json:"sessionNumber"` } -func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses []string, portsData []portData) error { +func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses []string, portsData []portUpdateNotification) error { joinWorkspace := func(req *jsonrpc2.Request) (interface{}, error) { return joinWorkspaceResult{1}, nil } @@ -163,7 +182,7 @@ func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses [ type rpcMessage struct { Method string - Params portData + Params liveshare.PortUpdate } go func() { @@ -174,14 +193,10 @@ func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses [ return case <-ch: pd := portsData[i] - // TODO: handle error - err := testServer.WriteToObjectStream(rpcMessage{ + _ := testServer.WriteToObjectStream(rpcMessage{ Method: eventResponses[i], - Params: pd, + Params: pd.PortUpdate, }) - if err != nil { - panic(err) - } } } }() diff --git a/pkg/liveshare/rpc.go b/pkg/liveshare/rpc.go index b5d520313..293cc4b00 100644 --- a/pkg/liveshare/rpc.go +++ b/pkg/liveshare/rpc.go @@ -42,41 +42,39 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac return waiter.Wait(waitCtx, result) } +type handlerFn func(req *jsonrpc2.Request) + type requestHandler struct { - eventHandlersMu sync.RWMutex - eventHandlers map[string]chan []byte + handlersMu sync.RWMutex + handlers map[string]handlerFn } func newRequestHandler() *requestHandler { - return &requestHandler{eventHandlers: make(map[string]chan []byte)} + return &requestHandler{handlers: make(map[string]handlerFn)} } -func (r *requestHandler) registerEvent(eventName string) chan []byte { - r.eventHandlersMu.Lock() - defer r.eventHandlersMu.Unlock() +func (r *requestHandler) register(requestType string, handler handlerFn) { + r.handlersMu.Lock() + defer r.handlersMu.Unlock() - if ch, ok := r.eventHandlers[eventName]; ok { - return ch + if _, ok := r.handlers[requestType]; ok { + return } - ch := make(chan []byte) - r.eventHandlers[eventName] = ch - return ch + r.handlers[requestType] = handler } -func (r *requestHandler) eventHandler(eventName string) chan []byte { - r.eventHandlersMu.RLock() - defer r.eventHandlersMu.RUnlock() +func (r *requestHandler) handler(requestType string) handlerFn { + r.handlersMu.RLock() + defer r.handlersMu.RUnlock() - return r.eventHandlers[eventName] + return r.handlers[requestType] } func (r *requestHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { - fmt.Println(req.Method) - handler := r.eventHandler(req.Method) - if handler == nil { - return // noop + if handler := r.handler(req.Method); handler != nil { + go func() { + handler(req) + }() } - - handler <- *req.Params } diff --git a/pkg/liveshare/rpc_test.go b/pkg/liveshare/rpc_test.go deleted file mode 100644 index 0f78b8780..000000000 --- a/pkg/liveshare/rpc_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package liveshare - -import ( - "context" - "encoding/json" - "fmt" - "net" - "testing" - - "github.com/sourcegraph/jsonrpc2" -) - -func TestRequestHandler(t *testing.T) { - r, w := net.Pipe() - client := newRPCClient(r) - - ctx := context.Background() - client.connect(ctx) - - type params struct { - Data string `json:"data"` - } - - ev := client.registerEventHandler("testEvent") - done := make(chan error) - go func() { - b := <-ev - var receivedParams params - if err := json.Unmarshal(b, &receivedParams); err != nil { - done <- err - return - } - if receivedParams.Data != "test" { - done <- fmt.Errorf("expected test, got %q", receivedParams.Data) - } - done <- nil - }() - - go func() { - codec := jsonrpc2.VSCodeObjectCodec{} - type message struct { - Method string `json:"method"` - Params params `json:"params"` - } - codec.WriteObject(w, message{ - Method: "testEvent", - Params: params{"test"}, - }) - }() - - err := <-done - if err != nil { - t.Fatal(err) - } -} diff --git a/pkg/liveshare/session.go b/pkg/liveshare/session.go index 25e2143ab..9095b2f0d 100644 --- a/pkg/liveshare/session.go +++ b/pkg/liveshare/session.go @@ -44,6 +44,19 @@ type Port struct { Privacy string `json:"privacy"` } +type PortChangeKind string + +const ( + PortChangeKindUpdate PortChangeKind = "update" +) + +type PortUpdate struct { + Port int `json:"port"` + ChangeKind PortChangeKind `json:"changeKind"` + ErrorDetail string `json:"errorDetail"` + StatusCode int `json:"statusCode"` +} + // startSharing tells the Live Share host to start sharing the specified port from the container. // The sessionName describes the purpose of the remote port or service. // It returns an identifier that can be used to open an SSH channel to the remote port. @@ -78,8 +91,12 @@ func (s *Session) UpdateSharedServerPrivacy(ctx context.Context, port int, visib return nil } -func (s *Session) RegisterEvent(eventName string) chan []byte { - return s.rpc.requestHandler.registerEvent(eventName) +// RegisterRequestHandler allows the caller to register a jsonrpc request handler +// for a given request type. The handler will be called when the request is received +// by the session's RPC server. If the request type has already been registered, the function will +// noop. +func (s *Session) RegisterRequestHandler(requestType string, h handlerFn) { + s.rpc.requestHandler.register(requestType, h) } // StartsSSHServer starts an SSH server in the container, installing sshd if necessary,