diff --git a/client.go b/client.go index 566db6cd3..65e80a94a 100644 --- a/client.go +++ b/client.go @@ -115,6 +115,10 @@ func (c *Client) joinWorkspace(ctx context.Context, rpc *rpcClient) (*joinWorksp } func (s *Session) openStreamingChannel(ctx context.Context, id channelID) (ssh.Channel, error) { + type getStreamArgs struct { + StreamName string `json:"streamName"` + Condition string `json:"condition"` + } args := getStreamArgs{ StreamName: id.name, Condition: id.condition, diff --git a/rpc.go b/rpc.go index 10aa2c7eb..68e187ad6 100644 --- a/rpc.go +++ b/rpc.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync" "github.com/opentracing/opentracing-go" "github.com/sourcegraph/jsonrpc2" @@ -12,18 +11,17 @@ import ( type rpcClient struct { *jsonrpc2.Conn - conn io.ReadWriteCloser - handler *rpcHandler + conn io.ReadWriteCloser } func newRPCClient(conn io.ReadWriteCloser) *rpcClient { - return &rpcClient{conn: conn, handler: newRPCHandler()} + return &rpcClient{conn: conn} } func (r *rpcClient) connect(ctx context.Context) { stream := jsonrpc2.NewBufferedStream(r.conn, jsonrpc2.VSCodeObjectCodec{}) // TODO(adonovan): fix: ensure r.Conn is eventually Closed! - r.Conn = jsonrpc2.NewConn(ctx, stream, r.handler) + r.Conn = jsonrpc2.NewConn(ctx, stream, nullHandler{}) } func (r *rpcClient) do(ctx context.Context, method string, args, result interface{}) error { @@ -38,36 +36,7 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac return waiter.Wait(ctx, result) } -type rpcHandlerFunc = func(*jsonrpc2.Request) +type nullHandler struct{} -type rpcHandler struct { - handlersMu sync.Mutex - handlers map[string][]rpcHandlerFunc -} - -func newRPCHandler() *rpcHandler { - return &rpcHandler{ - handlers: make(map[string][]rpcHandlerFunc), - } -} - -// registerEventHandler registers a handler for the specified event. -// After the next occurrence of the event, the handler will be called, -// once, in its own goroutine. -func (r *rpcHandler) registerEventHandler(eventMethod string, h rpcHandlerFunc) { - r.handlersMu.Lock() - r.handlers[eventMethod] = append(r.handlers[eventMethod], h) - r.handlersMu.Unlock() -} - -// Handle calls all registered handlers for the request, concurrently, each in its own goroutine. -func (r *rpcHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { - r.handlersMu.Lock() - handlers := r.handlers[req.Method] - r.handlers[req.Method] = nil - r.handlersMu.Unlock() - - for _, h := range handlers { - go h(req) - } +func (nullHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { } diff --git a/rpc_test.go b/rpc_test.go deleted file mode 100644 index cf9c4cf81..000000000 --- a/rpc_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package liveshare - -import ( - "context" - "testing" - "time" - - "github.com/sourcegraph/jsonrpc2" -) - -func TestRPCHandlerEvents(t *testing.T) { - rpcHandler := newRPCHandler() - eventCh := make(chan *jsonrpc2.Request) - rpcHandler.registerEventHandler("somethingHappened", func(req *jsonrpc2.Request) { - eventCh <- req - }) - go func() { - time.Sleep(1 * time.Second) - rpcHandler.Handle(context.Background(), nil, &jsonrpc2.Request{Method: "somethingHappened"}) - }() - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) - defer cancel() - select { - case event := <-eventCh: - if event.Method != "somethingHappened" { - t.Error("event.Method is not the expect value") - } - case <-ctx.Done(): - t.Error("Test time out") - } -} diff --git a/terminal.go b/terminal.go deleted file mode 100644 index 24a0f5121..000000000 --- a/terminal.go +++ /dev/null @@ -1,116 +0,0 @@ -package liveshare - -import ( - "context" - "fmt" - "io" - - "github.com/sourcegraph/jsonrpc2" - "golang.org/x/crypto/ssh" -) - -type Terminal struct { - session *Session -} - -func NewTerminal(session *Session) *Terminal { - return &Terminal{session: session} -} - -type TerminalCommand struct { - terminal *Terminal - cwd string - cmd string -} - -func (t *Terminal) NewCommand(cwd, cmd string) TerminalCommand { - return TerminalCommand{t, cwd, cmd} -} - -type runArgs struct { - Name string `json:"name"` - Rows int `json:"rows"` - Cols int `json:"cols"` - App string `json:"app"` - Cwd string `json:"cwd"` - CommandLine []string `json:"commandLine"` - ReadOnlyForGuests bool `json:"readOnlyForGuests"` -} - -type startTerminalResult struct { - ID int `json:"id"` - StreamName string `json:"streamName"` - StreamCondition string `json:"streamCondition"` - LocalPipeName string `json:"localPipeName"` - AppProcessID int `json:"appProcessId"` -} - -type getStreamArgs struct { - StreamName string `json:"streamName"` - Condition string `json:"condition"` -} - -type stopTerminalArgs struct { - ID int `json:"id"` -} - -func (t TerminalCommand) Run(ctx context.Context) (io.ReadCloser, error) { - args := runArgs{ - Name: "RunCommand", - Rows: 10, - Cols: 80, - App: "/bin/bash", - Cwd: t.cwd, - CommandLine: []string{"-c", t.cmd}, - ReadOnlyForGuests: false, - } - - started := make(chan struct{}) - t.terminal.session.rpc.handler.registerEventHandler("terminal.terminalStarted", func(*jsonrpc2.Request) { - close(started) - }) - var result startTerminalResult - if err := t.terminal.session.rpc.do(ctx, "terminal.startTerminal", &args, &result); err != nil { - return nil, fmt.Errorf("error making terminal.startTerminal call: %v", err) - } - <-started - - channel, err := t.terminal.session.openStreamingChannel(ctx, channelID{result.StreamName, result.StreamCondition}) - if err != nil { - return nil, fmt.Errorf("error opening streaming channel: %v", err) - } - - return t.newTerminalReadCloser(result.ID, channel), nil -} - -type terminalReadCloser struct { - terminalCommand TerminalCommand - terminalID int - channel ssh.Channel -} - -func (t TerminalCommand) newTerminalReadCloser(terminalID int, channel ssh.Channel) io.ReadCloser { - return terminalReadCloser{t, terminalID, channel} -} - -func (t terminalReadCloser) Read(b []byte) (int, error) { - return t.channel.Read(b) -} - -func (t terminalReadCloser) Close() error { - stopped := make(chan struct{}) - t.terminalCommand.terminal.session.rpc.handler.registerEventHandler("terminal.terminalStopped", func(*jsonrpc2.Request) { - close(stopped) - }) - if err := t.terminalCommand.terminal.session.rpc.do(context.Background(), "terminal.stopTerminal", []int{t.terminalID}, nil); err != nil { - return fmt.Errorf("error making terminal.stopTerminal call: %v", err) - } - - if err := t.channel.Close(); err != nil && err != io.EOF { - return fmt.Errorf("error closing channel: %v", err) - } - - <-stopped - - return nil -}