Refactor port update data into liveshare

This commit is contained in:
Jose Garcia 2022-02-25 08:09:12 -05:00
parent 9556c72ecf
commit 3847d965da
5 changed files with 115 additions and 141 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/cli/cli/v2/pkg/liveshare"
"github.com/cli/cli/v2/utils"
"github.com/muhammadmuzzammil1998/jsonc"
"github.com/sourcegraph/jsonrpc2"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
@ -273,8 +274,19 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar
}
defer safeClose(session, &err)
success := session.RegisterEvent("serverSharing.sharingSucceeded")
failure := session.RegisterEvent("serverSharing.sharingFailed")
notificationUpdate := make(chan portUpdateNotification)
h := func(success bool) func(*jsonrpc2.Request) {
return func(req *jsonrpc2.Request) {
var notification portUpdateNotification
if err := json.Unmarshal(*req.Params, &notification); err != nil {
return
}
notification.success = success
notificationUpdate <- notification
}
}
session.RegisterRequestHandler("serverSharing.sharingSucceeded", h(true))
session.RegisterRequestHandler("serverSharing.sharingFailed", h(false))
// TODO: check if port visibility can be updated in parallel instead of sequentially
for _, port := range ports {
@ -289,7 +301,7 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := a.waitForPortUpdate(ctx, success, failure, session, port.number); err != nil {
if err := a.waitForPortUpdate(ctx, notificationUpdate, session, port.number); err != nil {
return newErrUpdatingPortVisibility(port.number, port.visibility, err)
}
@ -299,43 +311,30 @@ func (a *App) UpdatePortVisibility(ctx context.Context, codespaceName string, ar
return nil
}
type portChangeKind string
const (
portChangeKindUpdate portChangeKind = "update"
)
type portData struct {
Port int `json:"port"`
ChangeKind portChangeKind `json:"changeKind"`
ErrorDetail string `json:"errorDetail"`
StatusCode int `json:"statusCode"`
type portUpdateNotification struct {
liveshare.PortUpdate
success bool
}
var errUpdatePortVisibilityForbidden = errors.New("organization admin has forbidden this privacy setting")
func (a *App) waitForPortUpdate(ctx context.Context, success, failure chan []byte, session *liveshare.Session, port int) error {
func (a *App) waitForPortUpdate(ctx context.Context, n chan portUpdateNotification, session *liveshare.Session, port int) error {
for {
var pd portData
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for server sharing to succeed or fail")
case b := <-success:
if err := json.Unmarshal(b, &pd); err != nil {
return fmt.Errorf("error unmarshaling port data: %w", err)
}
if pd.Port == port && pd.ChangeKind == portChangeKindUpdate {
return nil
}
case b := <-failure:
if err := json.Unmarshal(b, &pd); err != nil {
return fmt.Errorf("error unmarshaling port data: %w", err)
}
if pd.Port == port && pd.ChangeKind == portChangeKindUpdate {
if pd.StatusCode == http.StatusForbidden {
return errUpdatePortVisibilityForbidden
case update := <-n:
if update.success {
if update.Port == port && update.ChangeKind == liveshare.PortChangeKindUpdate {
return nil
}
} else {
if update.Port == port && update.ChangeKind == liveshare.PortChangeKindUpdate {
if update.StatusCode == http.StatusForbidden {
return errUpdatePortVisibilityForbidden
}
return errors.New(update.ErrorDetail)
}
return errors.New(pd.ErrorDetail)
}
}
}

View file

@ -9,6 +9,7 @@ import (
"github.com/cli/cli/v2/internal/codespaces/api"
"github.com/cli/cli/v2/pkg/iostreams"
"github.com/cli/cli/v2/pkg/liveshare"
livesharetest "github.com/cli/cli/v2/pkg/liveshare/test"
"github.com/sourcegraph/jsonrpc2"
)
@ -26,18 +27,24 @@ func TestPortsUpdateVisibilitySuccess(t *testing.T) {
}
eventResponses := []string{
"sharingSucceeded",
"sharingSucceeded",
"serverSharing.sharingSucceeded",
"serverSharing.sharingSucceeded",
}
portsData := []portData{
portsData := []portUpdateNotification{
{
Port: 80,
ChangeKind: portChangeKindUpdate,
success: true,
PortUpdate: liveshare.PortUpdate{
Port: 80,
ChangeKind: liveshare.PortChangeKindUpdate,
},
},
{
Port: 9999,
ChangeKind: portChangeKindUpdate,
success: true,
PortUpdate: liveshare.PortUpdate{
Port: 9999,
ChangeKind: liveshare.PortChangeKindUpdate,
},
},
}
@ -61,20 +68,26 @@ func TestPortsUpdateVisibilityFailure403(t *testing.T) {
}
eventResponses := []string{
"sharingSucceeded",
"sharingFailed",
"serverSharing.sharingSucceeded",
"serverSharing.sharingFailed",
}
portsData := []portData{
portsData := []portUpdateNotification{
{
Port: 80,
ChangeKind: portChangeKindUpdate,
success: true,
PortUpdate: liveshare.PortUpdate{
Port: 80,
ChangeKind: liveshare.PortChangeKindUpdate,
},
},
{
Port: 9999,
ChangeKind: portChangeKindUpdate,
ErrorDetail: "test error",
StatusCode: 403,
success: false,
PortUpdate: liveshare.PortUpdate{
Port: 9999,
ChangeKind: liveshare.PortChangeKindUpdate,
ErrorDetail: "test error",
StatusCode: 403,
},
},
}
@ -101,19 +114,25 @@ func TestPortsUpdateVisibilityFailure(t *testing.T) {
}
eventResponses := []string{
"sharingSucceeded",
"sharingFailed",
"serverSharing.sharingSucceeded",
"serverSharing.sharingFailed",
}
portsData := []portData{
portsData := []portUpdateNotification{
{
Port: 80,
ChangeKind: portChangeKindUpdate,
success: true,
PortUpdate: liveshare.PortUpdate{
Port: 80,
ChangeKind: liveshare.PortChangeKindUpdate,
},
},
{
Port: 9999,
ChangeKind: portChangeKindUpdate,
ErrorDetail: "test error",
success: false,
PortUpdate: liveshare.PortUpdate{
Port: 9999,
ChangeKind: liveshare.PortChangeKindUpdate,
ErrorDetail: "test error",
},
},
}
@ -132,7 +151,7 @@ type joinWorkspaceResult struct {
SessionNumber int `json:"sessionNumber"`
}
func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses []string, portsData []portData) error {
func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses []string, portsData []portUpdateNotification) error {
joinWorkspace := func(req *jsonrpc2.Request) (interface{}, error) {
return joinWorkspaceResult{1}, nil
}
@ -163,7 +182,7 @@ func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses [
type rpcMessage struct {
Method string
Params portData
Params liveshare.PortUpdate
}
go func() {
@ -174,14 +193,10 @@ func runUpdateVisibilityTest(portVisibilities []portVisibility, eventResponses [
return
case <-ch:
pd := portsData[i]
// TODO: handle error
err := testServer.WriteToObjectStream(rpcMessage{
_ := testServer.WriteToObjectStream(rpcMessage{
Method: eventResponses[i],
Params: pd,
Params: pd.PortUpdate,
})
if err != nil {
panic(err)
}
}
}
}()

View file

@ -42,41 +42,39 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac
return waiter.Wait(waitCtx, result)
}
type handlerFn func(req *jsonrpc2.Request)
type requestHandler struct {
eventHandlersMu sync.RWMutex
eventHandlers map[string]chan []byte
handlersMu sync.RWMutex
handlers map[string]handlerFn
}
func newRequestHandler() *requestHandler {
return &requestHandler{eventHandlers: make(map[string]chan []byte)}
return &requestHandler{handlers: make(map[string]handlerFn)}
}
func (r *requestHandler) registerEvent(eventName string) chan []byte {
r.eventHandlersMu.Lock()
defer r.eventHandlersMu.Unlock()
func (r *requestHandler) register(requestType string, handler handlerFn) {
r.handlersMu.Lock()
defer r.handlersMu.Unlock()
if ch, ok := r.eventHandlers[eventName]; ok {
return ch
if _, ok := r.handlers[requestType]; ok {
return
}
ch := make(chan []byte)
r.eventHandlers[eventName] = ch
return ch
r.handlers[requestType] = handler
}
func (r *requestHandler) eventHandler(eventName string) chan []byte {
r.eventHandlersMu.RLock()
defer r.eventHandlersMu.RUnlock()
func (r *requestHandler) handler(requestType string) handlerFn {
r.handlersMu.RLock()
defer r.handlersMu.RUnlock()
return r.eventHandlers[eventName]
return r.handlers[requestType]
}
func (r *requestHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
fmt.Println(req.Method)
handler := r.eventHandler(req.Method)
if handler == nil {
return // noop
if handler := r.handler(req.Method); handler != nil {
go func() {
handler(req)
}()
}
handler <- *req.Params
}

View file

@ -1,55 +0,0 @@
package liveshare
import (
"context"
"encoding/json"
"fmt"
"net"
"testing"
"github.com/sourcegraph/jsonrpc2"
)
func TestRequestHandler(t *testing.T) {
r, w := net.Pipe()
client := newRPCClient(r)
ctx := context.Background()
client.connect(ctx)
type params struct {
Data string `json:"data"`
}
ev := client.registerEventHandler("testEvent")
done := make(chan error)
go func() {
b := <-ev
var receivedParams params
if err := json.Unmarshal(b, &receivedParams); err != nil {
done <- err
return
}
if receivedParams.Data != "test" {
done <- fmt.Errorf("expected test, got %q", receivedParams.Data)
}
done <- nil
}()
go func() {
codec := jsonrpc2.VSCodeObjectCodec{}
type message struct {
Method string `json:"method"`
Params params `json:"params"`
}
codec.WriteObject(w, message{
Method: "testEvent",
Params: params{"test"},
})
}()
err := <-done
if err != nil {
t.Fatal(err)
}
}

View file

@ -44,6 +44,19 @@ type Port struct {
Privacy string `json:"privacy"`
}
type PortChangeKind string
const (
PortChangeKindUpdate PortChangeKind = "update"
)
type PortUpdate struct {
Port int `json:"port"`
ChangeKind PortChangeKind `json:"changeKind"`
ErrorDetail string `json:"errorDetail"`
StatusCode int `json:"statusCode"`
}
// startSharing tells the Live Share host to start sharing the specified port from the container.
// The sessionName describes the purpose of the remote port or service.
// It returns an identifier that can be used to open an SSH channel to the remote port.
@ -78,8 +91,12 @@ func (s *Session) UpdateSharedServerPrivacy(ctx context.Context, port int, visib
return nil
}
func (s *Session) RegisterEvent(eventName string) chan []byte {
return s.rpc.requestHandler.registerEvent(eventName)
// RegisterRequestHandler allows the caller to register a jsonrpc request handler
// for a given request type. The handler will be called when the request is received
// by the session's RPC server. If the request type has already been registered, the function will
// noop.
func (s *Session) RegisterRequestHandler(requestType string, h handlerFn) {
s.rpc.requestHandler.register(requestType, h)
}
// StartsSSHServer starts an SSH server in the container, installing sshd if necessary,