diff --git a/pkg/cmd/codespace/ports.go b/pkg/cmd/codespace/ports.go index 094833e30..4f35b33ce 100644 --- a/pkg/cmd/codespace/ports.go +++ b/pkg/cmd/codespace/ports.go @@ -7,8 +7,10 @@ import ( "errors" "fmt" "net" + "net/http" "strconv" "strings" + "time" "github.com/cli/cli/v2/internal/codespaces" "github.com/cli/cli/v2/internal/codespaces/api" @@ -253,6 +255,15 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar for _, port := range ports { a.StartProgressIndicatorWithLabel(fmt.Sprintf("Updating port %d visibility to: %s", port.number, port.visibility)) err := session.UpdateSharedServerPrivacy(ctx, port.number, port.visibility) + + // wait for succeed or failure + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + if err := a.waitForPortUpdate(ctx, session, port.number); err != nil { + return fmt.Errorf("error waiting for port update: %w", err) + } + a.StopProgressIndicator() if err != nil { return fmt.Errorf("error update port to public: %w", err) @@ -262,6 +273,48 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar return nil } +type portChangeKind string + +const ( + portChangeKindUpdate portChangeKind = "update" +) + +type portData struct { + Port int `json:"port"` + ChangeKind portChangeKind `json:"changeKind"` + ErrorDetail string `json:"errorDetail"` + StatusCode int `json:"statusCode"` +} + +func (a *App) waitForPortUpdate(ctx context.Context, session *liveshare.Session, port int) error { + success := session.WaitForEvent("sharingSucceeded") + failure := session.WaitForEvent("sharingFailed") + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for server sharing to succeed or fail") + case b := <-success: + if err := json.Unmarshal(b, &portData); err != nil { + return fmt.Errorf("error unmarshaling port data: %w", err) + } + if portData.Port == port && portData.ChangeKind == portChangeKindUpdate { + return nil + } + case b := <-failure: + if err := json.Unmarshal(b, &portData); err != nil { + return fmt.Errorf("error unmarshaling port data: %w", err) + } + if portData.Port == port && portData.ChangeKind == portChangeKindUpdate { + if portData.StatusCode == http.StatusForbidden { + return errors.New("organization admin has forbidden this privacy setting") + } + return errors.New(portData.ErrorDetail) + } + } + } +} + type portVisibility struct { number int visibility string diff --git a/pkg/liveshare/port_forwarder.go b/pkg/liveshare/port_forwarder.go index 2649abd3c..546201f12 100644 --- a/pkg/liveshare/port_forwarder.go +++ b/pkg/liveshare/port_forwarder.go @@ -97,6 +97,9 @@ func (fwd *PortForwarder) shareRemotePort(ctx context.Context) (channelID, error if err != nil { err = fmt.Errorf("failed to share remote port %d: %w", fwd.remotePort, err) } + + // wait for port change kind start + return id, err } diff --git a/pkg/liveshare/rpc.go b/pkg/liveshare/rpc.go index 4ab8fbb88..8c411f1ff 100644 --- a/pkg/liveshare/rpc.go +++ b/pkg/liveshare/rpc.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -13,15 +14,18 @@ import ( type rpcClient struct { *jsonrpc2.Conn conn io.ReadWriteCloser + + eventHandlersMu sync.RWMutex + eventHandlers map[string]chan []byte } func newRPCClient(conn io.ReadWriteCloser) *rpcClient { - return &rpcClient{conn: conn} + return &rpcClient{conn: conn, eventHandlers: make(map[string]chan []byte)} } func (r *rpcClient) connect(ctx context.Context) { stream := jsonrpc2.NewBufferedStream(r.conn, jsonrpc2.VSCodeObjectCodec{}) - r.Conn = jsonrpc2.NewConn(ctx, stream, nullHandler{}) + r.Conn = jsonrpc2.NewConn(ctx, stream, newRequestHandler(r)) } func (r *rpcClient) do(ctx context.Context, method string, args, result interface{}) error { @@ -40,7 +44,43 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac return waiter.Wait(waitCtx, result) } -type nullHandler struct{} +func (r *rpcClient) registerEventHandler(eventName string) chan []byte { + r.eventHandlersMu.Lock() + defer r.eventHandlersMu.Unlock() -func (nullHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + if ch, ok := r.eventHandlers[eventName]; ok { + return ch + } + + ch := make(chan []byte) + r.eventHandlers[eventName] = ch + return ch +} + +func (r *rpcClient) eventHandler(eventName string) chan []byte { + r.eventHandlersMu.RLock() + defer r.eventHandlersMu.RUnlock() + + return r.eventHandlers[eventName] +} + +type requestHandler struct { + rpcClient *rpcClient +} + +func newRequestHandler(rpcClient *rpcClient) *requestHandler { + return &requestHandler{rpcClient: rpcClient} +} + +func (e *requestHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + handler := e.rpcClient.eventHandler(req.Method) + if handler == nil { + return // noop + } + + select { + case handler <- *req.Params: + default: + // event handler + } } diff --git a/pkg/liveshare/rpc_test.go b/pkg/liveshare/rpc_test.go new file mode 100644 index 000000000..0f78b8780 --- /dev/null +++ b/pkg/liveshare/rpc_test.go @@ -0,0 +1,55 @@ +package liveshare + +import ( + "context" + "encoding/json" + "fmt" + "net" + "testing" + + "github.com/sourcegraph/jsonrpc2" +) + +func TestRequestHandler(t *testing.T) { + r, w := net.Pipe() + client := newRPCClient(r) + + ctx := context.Background() + client.connect(ctx) + + type params struct { + Data string `json:"data"` + } + + ev := client.registerEventHandler("testEvent") + done := make(chan error) + go func() { + b := <-ev + var receivedParams params + if err := json.Unmarshal(b, &receivedParams); err != nil { + done <- err + return + } + if receivedParams.Data != "test" { + done <- fmt.Errorf("expected test, got %q", receivedParams.Data) + } + done <- nil + }() + + go func() { + codec := jsonrpc2.VSCodeObjectCodec{} + type message struct { + Method string `json:"method"` + Params params `json:"params"` + } + codec.WriteObject(w, message{ + Method: "testEvent", + Params: params{"test"}, + }) + }() + + err := <-done + if err != nil { + t.Fatal(err) + } +} diff --git a/pkg/liveshare/session.go b/pkg/liveshare/session.go index b4bc3c16f..715d24fc6 100644 --- a/pkg/liveshare/session.go +++ b/pkg/liveshare/session.go @@ -78,6 +78,10 @@ func (s *Session) UpdateSharedServerPrivacy(ctx context.Context, port int, visib return nil } +func (s *Session) WaitForEvent(eventName string) chan []byte { + return s.rpc.registerEventHandler(eventName) +} + // StartsSSHServer starts an SSH server in the container, installing sshd if necessary, // and returns the port on which it listens and the user name clients should provide. func (s *Session) StartSSHServer(ctx context.Context) (int, string, error) {