From 55fa17d8bc3055ddd143ac0b4e70f8513c01ef70 Mon Sep 17 00:00:00 2001 From: Alan Donovan Date: Tue, 31 Aug 2021 17:30:40 -0400 Subject: [PATCH] wip --- client.go | 2 +- port_forwarder.go | 3 +-- port_forwarder_test.go | 3 ++- rpc.go | 26 +++++++++++++------------- rpc_test.go | 3 ++- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index a8a1e3864..628d557b5 100644 --- a/client.go +++ b/client.go @@ -64,7 +64,7 @@ func (c *Client) Join(ctx context.Context) (err error) { return fmt.Errorf("error connecting to ssh session: %v", err) } - c.rpc = newRpcClient(c.ssh) + c.rpc = newRPCClient(c.ssh) c.rpc.connect(ctx) _, err = c.joinWorkspace(ctx) diff --git a/port_forwarder.go b/port_forwarder.go index e6eedf16c..774fec863 100644 --- a/port_forwarder.go +++ b/port_forwarder.go @@ -48,10 +48,9 @@ func (l *PortForwarder) Start(ctx context.Context) error { case err := <-l.errCh: return err case <-ctx.Done(): + // TODO ctx.Error? return ln.Close() } - - return nil } func (l *PortForwarder) StartWithConn(ctx context.Context, conn io.ReadWriteCloser) error { diff --git a/port_forwarder_test.go b/port_forwarder_test.go index 33a33b39b..a3621c075 100644 --- a/port_forwarder_test.go +++ b/port_forwarder_test.go @@ -55,7 +55,8 @@ func TestPortForwarderStart(t *testing.T) { t.Errorf("create new server: %v", err) } - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() pf := NewPortForwarder(client, server, 8000) done := make(chan error) diff --git a/rpc.go b/rpc.go index 8abd0e98f..3fea63b10 100644 --- a/rpc.go +++ b/rpc.go @@ -15,7 +15,7 @@ type rpcClient struct { handler *rpcHandler } -func newRpcClient(conn io.ReadWriteCloser) *rpcClient { +func newRPCClient(conn io.ReadWriteCloser) *rpcClient { return &rpcClient{conn: conn, handler: newRPCHandler()} } @@ -24,17 +24,17 @@ func (r *rpcClient) connect(ctx context.Context) { r.Conn = jsonrpc2.NewConn(ctx, stream, r.handler) } -func (r *rpcClient) do(ctx context.Context, method string, args interface{}, result interface{}) error { +func (r *rpcClient) do(ctx context.Context, method string, args, result interface{}) error { waiter, err := r.Conn.DispatchCall(ctx, method, args) if err != nil { - return fmt.Errorf("error on dispatch call: %v", err) + return fmt.Errorf("error dispatching %q call: %v", method, err) } return waiter.Wait(ctx, result) } type rpcHandler struct { - mutex sync.RWMutex + mutex sync.Mutex eventHandlers map[string][]chan *jsonrpc2.Request } @@ -44,34 +44,34 @@ func newRPCHandler() *rpcHandler { } } +// TODO: document obligations around chan. It appears to be used for at most one request. func (r *rpcHandler) registerEventHandler(eventMethod string) <-chan *jsonrpc2.Request { r.mutex.Lock() defer r.mutex.Unlock() ch := make(chan *jsonrpc2.Request) - if _, ok := r.eventHandlers[eventMethod]; !ok { - r.eventHandlers[eventMethod] = []chan *jsonrpc2.Request{ch} - } else { - r.eventHandlers[eventMethod] = append(r.eventHandlers[eventMethod], ch) - } + r.eventHandlers[eventMethod] = append(r.eventHandlers[eventMethod], ch) return ch } func (r *rpcHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { r.mutex.Lock() - defer r.mutex.Unlock() + handlers := r.eventHandlers[req.Method] + r.eventHandlers[req.Method] = nil + r.mutex.Unlock() - if handlers, ok := r.eventHandlers[req.Method]; ok { + if len(handlers) > 0 { go func() { + // Broadcast the request to each handler in sequence. + // TODO rethink this. needs function call. for _, handler := range handlers { select { case handler <- req: case <-ctx.Done(): + // TODO: ctx.Err break } } - - r.eventHandlers[req.Method] = []chan *jsonrpc2.Request{} }() } } diff --git a/rpc_test.go b/rpc_test.go index d16b32a4f..7543152d1 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -15,7 +15,8 @@ func TestRPCHandlerEvents(t *testing.T) { time.Sleep(1 * time.Second) rpcHandler.Handle(context.Background(), nil, &jsonrpc2.Request{Method: "somethingHappened"}) }() - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer cancel() select { case event := <-eventCh: if event.Method != "somethingHappened" {