RPC functionality started take two

This commit is contained in:
Jose Garcia 2021-06-24 20:45:03 -04:00
parent a8b1b87f7b
commit 897ab1598b
5 changed files with 248 additions and 55 deletions

100
adapter.go Normal file
View file

@ -0,0 +1,100 @@
package liveshare
import (
"errors"
"io"
"net"
"sync"
"time"
"github.com/gorilla/websocket"
)
type Adapter struct {
conn *websocket.Conn
readMutex sync.Mutex
writeMutex sync.Mutex
reader io.Reader
}
func NewAdapter(conn *websocket.Conn) *Adapter {
return &Adapter{
conn: conn,
}
}
func (a *Adapter) Read(b []byte) (int, error) {
// Read() can be called concurrently, and we mutate some internal state here
a.readMutex.Lock()
defer a.readMutex.Unlock()
if a.reader == nil {
messageType, reader, err := a.conn.NextReader()
if err != nil {
return 0, err
}
if messageType != websocket.BinaryMessage {
return 0, errors.New("unexpected websocket message type")
}
a.reader = reader
}
bytesRead, err := a.reader.Read(b)
if err != nil {
a.reader = nil
// EOF for the current Websocket frame, more will probably come so..
if err == io.EOF {
// .. we must hide this from the caller since our semantics are a
// stream of bytes across many frames
err = nil
}
}
return bytesRead, err
}
func (a *Adapter) Write(b []byte) (int, error) {
a.writeMutex.Lock()
defer a.writeMutex.Unlock()
nextWriter, err := a.conn.NextWriter(websocket.BinaryMessage)
if err != nil {
return 0, err
}
bytesWritten, err := nextWriter.Write(b)
nextWriter.Close()
return bytesWritten, err
}
func (a *Adapter) Close() error {
return a.conn.Close()
}
func (a *Adapter) LocalAddr() net.Addr {
return a.conn.LocalAddr()
}
func (a *Adapter) RemoteAddr() net.Addr {
return a.conn.RemoteAddr()
}
func (a *Adapter) SetDeadline(t time.Time) error {
if err := a.SetReadDeadline(t); err != nil {
return err
}
return a.SetWriteDeadline(t)
}
func (a *Adapter) SetReadDeadline(t time.Time) error {
return a.conn.SetReadDeadline(t)
}
func (a *Adapter) SetWriteDeadline(t time.Time) error {
return a.conn.SetWriteDeadline(t)
}

40
api.go
View file

@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strings"
)
@ -31,27 +32,28 @@ func NewAPI(configuration *Configuration) *API {
}
type WorkspaceAccessResponse struct {
SessionToken string `json:"sessionToken"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
Name string `json:"name"`
OwnerID string `json:"ownerId"`
JoinLink string `json:"joinLink"`
ConnectLinks []string `json:"connectLinks"`
RelayLink string `json:"relayLink"`
RelaySas string `json:"relaySas"`
HostPublicKeys []string `json:"hostPublicKeys"`
ConversationID string `json:"conversationId"`
AssociatedUserIDs []string `json:"associatedUserIds"`
AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"`
IsHostConnected bool `json:"isHostConnected"`
ExpiresAt string `json:"expiresAt"`
InvitationLinks []string `json:"invitationLinks"`
ID string `json:"id"`
SessionToken string `json:"sessionToken"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
Name string `json:"name"`
OwnerID string `json:"ownerId"`
JoinLink string `json:"joinLink"`
ConnectLinks []string `json:"connectLinks"`
RelayLink string `json:"relayLink"`
RelaySas string `json:"relaySas"`
HostPublicKeys []string `json:"hostPublicKeys"`
ConversationID string `json:"conversationId"`
AssociatedUserIDs map[string]string `json:"associatedUserIds"`
AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"`
IsHostConnected bool `json:"isHostConnected"`
ExpiresAt string `json:"expiresAt"`
InvitationLinks []string `json:"invitationLinks"`
ID string `json:"id"`
}
func (a *API) WorkspaceAccess() (*WorkspaceAccessResponse, error) {
url := fmt.Sprintf("%s/workspace/%s/user", a.ServiceURI, a.WorkspaceID)
fmt.Println(url)
req, err := http.NewRequest(http.MethodPut, url, nil)
if err != nil {
@ -69,6 +71,8 @@ func (a *API) WorkspaceAccess() (*WorkspaceAccessResponse, error) {
return nil, fmt.Errorf("error reading response body: %v", err)
}
d, _ := httputil.DumpResponse(resp, true)
fmt.Println(string(d))
var workspaceAccessResponse WorkspaceAccessResponse
if err := json.Unmarshal(b, &workspaceAccessResponse); err != nil {
return nil, fmt.Errorf("error unmarshaling response into json: %v", err)
@ -94,7 +98,7 @@ type WorkspaceInfoResponse struct {
RelaySas string `json:"relaySas"`
HostPublicKeys []string `json:"hostPublicKeys"`
ConversationID string `json:"conversationId"`
AssociatedUserIDs []string `json:"associatedUserIds"`
AssociatedUserIDs map[string]string
AreAnonymousGuestsAllowed bool `json:"areAnonymousGuestsAllowed"`
IsHostConnected bool `json:"isHostConnected"`
ExpiresAt string `json:"expiresAt"`

View file

@ -7,10 +7,11 @@ import (
type Client struct {
Configuration *Configuration
SSHSession *SSHSession
}
func NewClient(configuration *Configuration) *Client {
return &Client{configuration}
return &Client{Configuration: configuration}
}
func (c *Client) Join(ctx context.Context) error {
@ -19,9 +20,9 @@ func (c *Client) Join(ctx context.Context) error {
return fmt.Errorf("error getting session: %v", err)
}
sshSession := NewSSHSession(session)
if err := sshSession.Connect(); err != nil {
return fmt.Errorf("error authenticating ssh session: %v", err)
c.SSHSession, err = NewSSH(session).NewSession()
if err != nil {
return fmt.Errorf("error connecting to ssh session: %v", err)
}
return nil

View file

@ -3,10 +3,14 @@ package liveshare
import (
"context"
"fmt"
"net/rpc"
)
type LiveShare struct {
Configuration *Configuration
workspaceClient *Client
terminal *Terminal
}
func New(opts ...Option) (*LiveShare, error) {
@ -22,14 +26,67 @@ func New(opts ...Option) (*LiveShare, error) {
return nil, fmt.Errorf("error validating configuration: %v", err)
}
return &LiveShare{configuration}, nil
return &LiveShare{Configuration: configuration}, nil
}
func (l *LiveShare) Connect(ctx context.Context) error {
workspaceClient := NewClient(l.Configuration)
if err := workspaceClient.Join(ctx); err != nil {
l.workspaceClient = NewClient(l.Configuration)
if err := l.workspaceClient.Join(ctx); err != nil {
return fmt.Errorf("error joining with workspace client: %v", err)
}
return nil
}
type Terminal struct {
WorkspaceClient *Client
RPCClient *rpc.Client
}
func (l *LiveShare) NewTerminal() *Terminal {
return &Terminal{
WorkspaceClient: l.workspaceClient,
RPCClient: rpc.NewClient(l.workspaceClient.SSHSession),
}
}
type TerminalCommand struct {
Terminal *Terminal
Cwd string
Cmd string
}
func (t *Terminal) NewCommand(cwd, cmd string) TerminalCommand {
return TerminalCommand{t, cwd, cmd}
}
type RunArgs struct {
Name string
Rows, Cols int
App string
Cwd string
CommandLine []string
ReadOnlyForGuests bool
}
func (t TerminalCommand) Run(ctx context.Context) ([]byte, error) {
args := RunArgs{
Name: "RunCommand",
Rows: 10,
Cols: 80,
App: "/bin/bash",
Cwd: t.Cwd,
CommandLine: []string{"-c", t.Cmd},
ReadOnlyForGuests: false,
}
var output []byte
runCall := t.Terminal.RPCClient.Go("terminal.startAsync", &args, &output, nil)
runReply := <-runCall.Done
if runReply.Error != nil {
return nil, fmt.Errorf("error startAsync operation: %v", runReply.Error)
}
fmt.Printf("%+v\n\n", runReply)
return output, nil
}

91
ssh.go
View file

@ -2,29 +2,66 @@ package liveshare
import (
"fmt"
"io"
"net"
"net/url"
"strings"
"time"
"github.com/gorilla/websocket"
"golang.org/x/crypto/ssh"
"golang.org/x/net/websocket"
)
type SSHSession struct {
Session *Session
VersionExchangeError chan error
type SSH struct {
Session *Session
}
func NewSSHSession(session *Session) *SSHSession {
return &SSHSession{
func NewSSH(session *Session) *SSH {
return &SSH{
Session: session,
}
}
func (s *SSHSession) Connect() error {
// Reference:
// https://github.com/Azure/azure-relay-node/blob/7b57225365df3010163bf4b9e640868a02737eb6/hyco-ws/index.js#L107-L137
func (s *SSH) relayURI(action string) string {
relaySas := url.QueryEscape(s.Session.WorkspaceAccess.RelaySas)
relayURI := s.Session.WorkspaceAccess.RelayLink
relayURI = strings.Replace(relayURI, "sb:", "wss:", -1)
relayURI = strings.Replace(relayURI, ".net/", ".net:443/$hc/", 1)
relayURI = relayURI + "?sb-hc-action=" + action + "&sb-hc-token=" + relaySas
return relayURI
}
func (s *SSH) socketStream() (net.Conn, error) {
uri := s.relayURI("connect")
ws, _, err := websocket.DefaultDialer.Dial(uri, nil)
if err != nil {
return nil, fmt.Errorf("error dialing websocket connection: %v", err)
}
return NewAdapter(ws), nil
}
type SSHSession struct {
*ssh.Session
reader io.Reader
writer io.Writer
}
func (s SSHSession) Read(p []byte) (n int, err error) {
return s.reader.Read(p)
}
func (s SSHSession) Write(p []byte) (n int, err error) {
return s.writer.Write(p)
}
func (s *SSH) NewSession() (*SSHSession, error) {
socketStream, err := s.socketStream()
if err != nil {
return fmt.Errorf("error creating socket stream: %v", err)
return nil, fmt.Errorf("error creating socket stream: %v", err)
}
clientConfig := ssh.ClientConfig{
@ -36,35 +73,29 @@ func (s *SSHSession) Connect() error {
// TODO(josebalius): implement
return nil
},
Timeout: 10 * time.Second,
}
sshClientConn, chans, reqs, err := ssh.NewClientConn(socketStream, "", &clientConfig)
if err != nil {
return fmt.Errorf("error creating ssh client connection: %v", err)
return nil, fmt.Errorf("error creating ssh client connection: %v", err)
}
fmt.Println(sshClientConn, chans, reqs)
return nil
}
// Reference:
// https://github.com/Azure/azure-relay-node/blob/7b57225365df3010163bf4b9e640868a02737eb6/hyco-ws/index.js#L107-L137
func (s *SSHSession) relayURI(action string) string {
relaySas := url.QueryEscape(s.Session.WorkspaceAccess.RelaySas)
relayURI := s.Session.WorkspaceAccess.RelayLink
relayURI = strings.Replace(relayURI, "sb:", "wss:", -1)
relayURI = strings.Replace(relayURI, ".net/", ".net:443/$hc/", 1)
relayURI = relayURI + "?sb-hc-action=" + action + "&sb-hc-token=" + relaySas
return relayURI
}
func (s *SSHSession) socketStream() (*websocket.Conn, error) {
uri := s.relayURI("connect")
ws, err := websocket.Dial(uri, "", uri)
sshClient := ssh.NewClient(sshClientConn, chans, reqs)
sshSession, err := sshClient.NewSession()
if err != nil {
return nil, fmt.Errorf("error dialing relay connection: %v", err)
return nil, fmt.Errorf("error creating ssh client session: %v", err)
}
return ws, nil
reader, err := sshSession.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error creating ssh session reader: %v", err)
}
writer, err := sshSession.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error creating ssh session writer: %v", err)
}
return &SSHSession{Session: sshSession, reader: reader, writer: writer}, nil
}