From 9e13f6ba6b17b1aec11d7e0971e5748e7541925f Mon Sep 17 00:00:00 2001 From: Jose Garcia Date: Tue, 11 Oct 2022 09:14:23 -0400 Subject: [PATCH] cleanup connect --- internal/codespaces/grpc/client.go | 35 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 5ced382f8..76119d659 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -48,6 +48,12 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie if err != nil { return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err) } + localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) + + client := &Client{ + token: token, + listener: listener, + } // Create a cancelable context to be able to cancel background tasks // if we encounter an error while connecting to the gRPC server @@ -58,29 +64,27 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } }() + ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine + // Ensure we close the port forwarder if we encounter an error // or once the gRPC connection is closed. pfcancel is retained // to close the PF whenever we close the gRPC connection. pfctx, pfcancel := context.WithCancel(connectctx) - - ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine + client.cancelPF = pfcancel // Tunnel the remote gRPC server port to the local port - localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) go func() { fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) ch <- fwd.ForwardToListener(pfctx, listener) }() - // Attempt to connect to the port - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - } - var conn *grpc.ClientConn - go func() { + // Attempt to connect to the port + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } conn, err = grpc.DialContext(connectctx, localAddress, opts...) ch <- err // nil if we successfully connected }() @@ -95,15 +99,10 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } } - g := &Client{ - conn: conn, - token: token, - listener: listener, - jupyterClient: jupyter.NewJupyterServerHostClient(conn), - cancelPF: pfcancel, - } + client.conn = conn + client.jupyterClient = jupyter.NewJupyterServerHostClient(conn) - return g, nil + return client, nil } // Closes the gRPC connection