diff --git a/adapter.go b/adapter.go new file mode 100644 index 000000000..fb3424734 --- /dev/null +++ b/adapter.go @@ -0,0 +1,100 @@ +package liveshare + +import ( + "errors" + "io" + "net" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type Adapter struct { + conn *websocket.Conn + readMutex sync.Mutex + writeMutex sync.Mutex + reader io.Reader +} + +func NewAdapter(conn *websocket.Conn) *Adapter { + return &Adapter{ + conn: conn, + } +} + +func (a *Adapter) Read(b []byte) (int, error) { + // Read() can be called concurrently, and we mutate some internal state here + a.readMutex.Lock() + defer a.readMutex.Unlock() + + if a.reader == nil { + messageType, reader, err := a.conn.NextReader() + if err != nil { + return 0, err + } + + if messageType != websocket.BinaryMessage { + return 0, errors.New("unexpected websocket message type") + } + + a.reader = reader + } + + bytesRead, err := a.reader.Read(b) + if err != nil { + a.reader = nil + + // EOF for the current Websocket frame, more will probably come so.. + if err == io.EOF { + // .. we must hide this from the caller since our semantics are a + // stream of bytes across many frames + err = nil + } + } + + return bytesRead, err +} + +func (a *Adapter) Write(b []byte) (int, error) { + a.writeMutex.Lock() + defer a.writeMutex.Unlock() + + nextWriter, err := a.conn.NextWriter(websocket.BinaryMessage) + if err != nil { + return 0, err + } + + bytesWritten, err := nextWriter.Write(b) + nextWriter.Close() + + return bytesWritten, err +} + +func (a *Adapter) Close() error { + return a.conn.Close() +} + +func (a *Adapter) LocalAddr() net.Addr { + return a.conn.LocalAddr() +} + +func (a *Adapter) RemoteAddr() net.Addr { + return a.conn.RemoteAddr() +} + +func (a *Adapter) SetDeadline(t time.Time) error { + if err := a.SetReadDeadline(t); err != nil { + return err + } + + return a.SetWriteDeadline(t) +} + +func (a *Adapter) SetReadDeadline(t time.Time) error { + return a.conn.SetReadDeadline(t) +} + +func (a *Adapter) SetWriteDeadline(t time.Time) error { + return a.conn.SetWriteDeadline(t) +} diff --git a/api.go b/api.go index c7efb9830..d8a4bebbd 100644 --- a/api.go +++ b/api.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/http/httputil" "strings" ) @@ -31,27 +32,28 @@ func NewAPI(configuration *Configuration) *API { } type WorkspaceAccessResponse struct { - SessionToken string `json:"sessionToken"` - CreatedAt string `json:"createdAt"` - UpdatedAt string `json:"updatedAt"` - Name string `json:"name"` - OwnerID string `json:"ownerId"` - JoinLink string `json:"joinLink"` - ConnectLinks []string `json:"connectLinks"` - RelayLink string `json:"relayLink"` - RelaySas string `json:"relaySas"` - HostPublicKeys []string `json:"hostPublicKeys"` - ConversationID string `json:"conversationId"` - AssociatedUserIDs []string `json:"associatedUserIds"` - AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"` - IsHostConnected bool `json:"isHostConnected"` - ExpiresAt string `json:"expiresAt"` - InvitationLinks []string `json:"invitationLinks"` - ID string `json:"id"` + SessionToken string `json:"sessionToken"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` + Name string `json:"name"` + OwnerID string `json:"ownerId"` + JoinLink string `json:"joinLink"` + ConnectLinks []string `json:"connectLinks"` + RelayLink string `json:"relayLink"` + RelaySas string `json:"relaySas"` + HostPublicKeys []string `json:"hostPublicKeys"` + ConversationID string `json:"conversationId"` + AssociatedUserIDs map[string]string `json:"associatedUserIds"` + AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"` + IsHostConnected bool `json:"isHostConnected"` + ExpiresAt string `json:"expiresAt"` + InvitationLinks []string `json:"invitationLinks"` + ID string `json:"id"` } func (a *API) WorkspaceAccess() (*WorkspaceAccessResponse, error) { url := fmt.Sprintf("%s/workspace/%s/user", a.ServiceURI, a.WorkspaceID) + fmt.Println(url) req, err := http.NewRequest(http.MethodPut, url, nil) if err != nil { @@ -69,6 +71,8 @@ func (a *API) WorkspaceAccess() (*WorkspaceAccessResponse, error) { return nil, fmt.Errorf("error reading response body: %v", err) } + d, _ := httputil.DumpResponse(resp, true) + fmt.Println(string(d)) var workspaceAccessResponse WorkspaceAccessResponse if err := json.Unmarshal(b, &workspaceAccessResponse); err != nil { return nil, fmt.Errorf("error unmarshaling response into json: %v", err) @@ -94,7 +98,7 @@ type WorkspaceInfoResponse struct { RelaySas string `json:"relaySas"` HostPublicKeys []string `json:"hostPublicKeys"` ConversationID string `json:"conversationId"` - AssociatedUserIDs []string `json:"associatedUserIds"` + AssociatedUserIDs map[string]string AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"` IsHostConnected bool `json:"isHostConnected"` ExpiresAt string `json:"expiresAt"` diff --git a/client.go b/client.go index a58a34c9b..0a89a125c 100644 --- a/client.go +++ b/client.go @@ -7,10 +7,11 @@ import ( type Client struct { Configuration *Configuration + SSHSession *SSHSession } func NewClient(configuration *Configuration) *Client { - return &Client{configuration} + return &Client{Configuration: configuration} } func (c *Client) Join(ctx context.Context) error { @@ -19,9 +20,9 @@ func (c *Client) Join(ctx context.Context) error { return fmt.Errorf("error getting session: %v", err) } - sshSession := NewSSHSession(session) - if err := sshSession.Connect(); err != nil { - return fmt.Errorf("error authenticating ssh session: %v", err) + c.SSHSession, err = NewSSH(session).NewSession() + if err != nil { + return fmt.Errorf("error connecting to ssh session: %v", err) } return nil diff --git a/liveshare.go b/liveshare.go index a8c8b69d6..174eac20f 100644 --- a/liveshare.go +++ b/liveshare.go @@ -3,10 +3,14 @@ package liveshare import ( "context" "fmt" + "net/rpc" ) type LiveShare struct { Configuration *Configuration + + workspaceClient *Client + terminal *Terminal } func New(opts ...Option) (*LiveShare, error) { @@ -22,14 +26,67 @@ func New(opts ...Option) (*LiveShare, error) { return nil, fmt.Errorf("error validating configuration: %v", err) } - return &LiveShare{configuration}, nil + return &LiveShare{Configuration: configuration}, nil } func (l *LiveShare) Connect(ctx context.Context) error { - workspaceClient := NewClient(l.Configuration) - if err := workspaceClient.Join(ctx); err != nil { + l.workspaceClient = NewClient(l.Configuration) + if err := l.workspaceClient.Join(ctx); err != nil { return fmt.Errorf("error joining with workspace client: %v", err) } return nil } + +type Terminal struct { + WorkspaceClient *Client + RPCClient *rpc.Client +} + +func (l *LiveShare) NewTerminal() *Terminal { + return &Terminal{ + WorkspaceClient: l.workspaceClient, + RPCClient: rpc.NewClient(l.workspaceClient.SSHSession), + } +} + +type TerminalCommand struct { + Terminal *Terminal + Cwd string + Cmd string +} + +func (t *Terminal) NewCommand(cwd, cmd string) TerminalCommand { + return TerminalCommand{t, cwd, cmd} +} + +type RunArgs struct { + Name string + Rows, Cols int + App string + Cwd string + CommandLine []string + ReadOnlyForGuests bool +} + +func (t TerminalCommand) Run(ctx context.Context) ([]byte, error) { + args := RunArgs{ + Name: "RunCommand", + Rows: 10, + Cols: 80, + App: "/bin/bash", + Cwd: t.Cwd, + CommandLine: []string{"-c", t.Cmd}, + ReadOnlyForGuests: false, + } + + var output []byte + runCall := t.Terminal.RPCClient.Go("terminal.startAsync", &args, &output, nil) + + runReply := <-runCall.Done + if runReply.Error != nil { + return nil, fmt.Errorf("error startAsync operation: %v", runReply.Error) + } + fmt.Printf("%+v\n\n", runReply) + return output, nil +} diff --git a/ssh.go b/ssh.go index af132b331..51906290e 100644 --- a/ssh.go +++ b/ssh.go @@ -2,29 +2,66 @@ package liveshare import ( "fmt" + "io" "net" "net/url" "strings" + "time" + "github.com/gorilla/websocket" "golang.org/x/crypto/ssh" - "golang.org/x/net/websocket" ) -type SSHSession struct { - Session *Session - VersionExchangeError chan error +type SSH struct { + Session *Session } -func NewSSHSession(session *Session) *SSHSession { - return &SSHSession{ +func NewSSH(session *Session) *SSH { + return &SSH{ Session: session, } } -func (s *SSHSession) Connect() error { +// Reference: +// https://github.com/Azure/azure-relay-node/blob/7b57225365df3010163bf4b9e640868a02737eb6/hyco-ws/index.js#L107-L137 +func (s *SSH) relayURI(action string) string { + relaySas := url.QueryEscape(s.Session.WorkspaceAccess.RelaySas) + relayURI := s.Session.WorkspaceAccess.RelayLink + relayURI = strings.Replace(relayURI, "sb:", "wss:", -1) + relayURI = strings.Replace(relayURI, ".net/", ".net:443/$hc/", 1) + relayURI = relayURI + "?sb-hc-action=" + action + "&sb-hc-token=" + relaySas + return relayURI +} + +func (s *SSH) socketStream() (net.Conn, error) { + uri := s.relayURI("connect") + + ws, _, err := websocket.DefaultDialer.Dial(uri, nil) + if err != nil { + return nil, fmt.Errorf("error dialing websocket connection: %v", err) + } + + return NewAdapter(ws), nil +} + +type SSHSession struct { + *ssh.Session + reader io.Reader + writer io.Writer +} + +func (s SSHSession) Read(p []byte) (n int, err error) { + return s.reader.Read(p) +} + +func (s SSHSession) Write(p []byte) (n int, err error) { + return s.writer.Write(p) +} + +func (s *SSH) NewSession() (*SSHSession, error) { socketStream, err := s.socketStream() if err != nil { - return fmt.Errorf("error creating socket stream: %v", err) + return nil, fmt.Errorf("error creating socket stream: %v", err) } clientConfig := ssh.ClientConfig{ @@ -36,35 +73,29 @@ func (s *SSHSession) Connect() error { // TODO(josebalius): implement return nil }, + Timeout: 10 * time.Second, } sshClientConn, chans, reqs, err := ssh.NewClientConn(socketStream, "", &clientConfig) if err != nil { - return fmt.Errorf("error creating ssh client connection: %v", err) + return nil, fmt.Errorf("error creating ssh client connection: %v", err) } - fmt.Println(sshClientConn, chans, reqs) - - return nil -} - -// Reference: -// https://github.com/Azure/azure-relay-node/blob/7b57225365df3010163bf4b9e640868a02737eb6/hyco-ws/index.js#L107-L137 -func (s *SSHSession) relayURI(action string) string { - relaySas := url.QueryEscape(s.Session.WorkspaceAccess.RelaySas) - relayURI := s.Session.WorkspaceAccess.RelayLink - relayURI = strings.Replace(relayURI, "sb:", "wss:", -1) - relayURI = strings.Replace(relayURI, ".net/", ".net:443/$hc/", 1) - relayURI = relayURI + "?sb-hc-action=" + action + "&sb-hc-token=" + relaySas - return relayURI -} - -func (s *SSHSession) socketStream() (*websocket.Conn, error) { - uri := s.relayURI("connect") - ws, err := websocket.Dial(uri, "", uri) + sshClient := ssh.NewClient(sshClientConn, chans, reqs) + sshSession, err := sshClient.NewSession() if err != nil { - return nil, fmt.Errorf("error dialing relay connection: %v", err) + return nil, fmt.Errorf("error creating ssh client session: %v", err) } - return ws, nil + reader, err := sshSession.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error creating ssh session reader: %v", err) + } + + writer, err := sshSession.StdinPipe() + if err != nil { + return nil, fmt.Errorf("error creating ssh session writer: %v", err) + } + + return &SSHSession{Session: sshSession, reader: reader, writer: writer}, nil }