This commit is contained in:
Alan Donovan 2021-08-31 17:30:40 -04:00
parent 273782bcbc
commit 55fa17d8bc
5 changed files with 19 additions and 18 deletions

View file

@ -64,7 +64,7 @@ func (c *Client) Join(ctx context.Context) (err error) {
return fmt.Errorf("error connecting to ssh session: %v", err)
}
c.rpc = newRpcClient(c.ssh)
c.rpc = newRPCClient(c.ssh)
c.rpc.connect(ctx)
_, err = c.joinWorkspace(ctx)

View file

@ -48,10 +48,9 @@ func (l *PortForwarder) Start(ctx context.Context) error {
case err := <-l.errCh:
return err
case <-ctx.Done():
// TODO ctx.Error?
return ln.Close()
}
return nil
}
func (l *PortForwarder) StartWithConn(ctx context.Context, conn io.ReadWriteCloser) error {

View file

@ -55,7 +55,8 @@ func TestPortForwarderStart(t *testing.T) {
t.Errorf("create new server: %v", err)
}
ctx, _ := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pf := NewPortForwarder(client, server, 8000)
done := make(chan error)

26
rpc.go
View file

@ -15,7 +15,7 @@ type rpcClient struct {
handler *rpcHandler
}
func newRpcClient(conn io.ReadWriteCloser) *rpcClient {
func newRPCClient(conn io.ReadWriteCloser) *rpcClient {
return &rpcClient{conn: conn, handler: newRPCHandler()}
}
@ -24,17 +24,17 @@ func (r *rpcClient) connect(ctx context.Context) {
r.Conn = jsonrpc2.NewConn(ctx, stream, r.handler)
}
func (r *rpcClient) do(ctx context.Context, method string, args interface{}, result interface{}) error {
func (r *rpcClient) do(ctx context.Context, method string, args, result interface{}) error {
waiter, err := r.Conn.DispatchCall(ctx, method, args)
if err != nil {
return fmt.Errorf("error on dispatch call: %v", err)
return fmt.Errorf("error dispatching %q call: %v", method, err)
}
return waiter.Wait(ctx, result)
}
type rpcHandler struct {
mutex sync.RWMutex
mutex sync.Mutex
eventHandlers map[string][]chan *jsonrpc2.Request
}
@ -44,34 +44,34 @@ func newRPCHandler() *rpcHandler {
}
}
// TODO: document obligations around chan. It appears to be used for at most one request.
func (r *rpcHandler) registerEventHandler(eventMethod string) <-chan *jsonrpc2.Request {
r.mutex.Lock()
defer r.mutex.Unlock()
ch := make(chan *jsonrpc2.Request)
if _, ok := r.eventHandlers[eventMethod]; !ok {
r.eventHandlers[eventMethod] = []chan *jsonrpc2.Request{ch}
} else {
r.eventHandlers[eventMethod] = append(r.eventHandlers[eventMethod], ch)
}
r.eventHandlers[eventMethod] = append(r.eventHandlers[eventMethod], ch)
return ch
}
func (r *rpcHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
r.mutex.Lock()
defer r.mutex.Unlock()
handlers := r.eventHandlers[req.Method]
r.eventHandlers[req.Method] = nil
r.mutex.Unlock()
if handlers, ok := r.eventHandlers[req.Method]; ok {
if len(handlers) > 0 {
go func() {
// Broadcast the request to each handler in sequence.
// TODO rethink this. needs function call.
for _, handler := range handlers {
select {
case handler <- req:
case <-ctx.Done():
// TODO: ctx.Err
break
}
}
r.eventHandlers[req.Method] = []chan *jsonrpc2.Request{}
}()
}
}

View file

@ -15,7 +15,8 @@ func TestRPCHandlerEvents(t *testing.T) {
time.Sleep(1 * time.Second)
rpcHandler.Handle(context.Background(), nil, &jsonrpc2.Request{Method: "somethingHappened"})
}()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
select {
case event := <-eventCh:
if event.Method != "somethingHappened" {