From b0eb1b379acec0b034ae7887aaffc63ded6f75f1 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Tue, 27 Sep 2022 11:34:30 -0700 Subject: [PATCH 01/14] Implement gRPC client --- go.mod | 3 + go.sum | 6 + pkg/grpc/client.go | 76 ++++++ .../jupyter/JupyterServerHostService.v1.pb.go | 241 ++++++++++++++++++ .../jupyter/JupyterServerHostService.v1.proto | 19 ++ .../JupyterServerHostService.v1_grpc.pb.go | 105 ++++++++ pkg/liveshare/client.go | 41 +++ pkg/liveshare/session.go | 28 +- 8 files changed, 496 insertions(+), 23 deletions(-) create mode 100644 pkg/grpc/client.go create mode 100644 pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go create mode 100644 pkg/grpc/jupyter/JupyterServerHostService.v1.proto create mode 100644 pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go diff --git a/go.mod b/go.mod index 08d4c245b..67d18003b 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,8 @@ require ( golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/text v0.3.7 + google.golang.org/grpc v1.49.0 + google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -70,6 +72,7 @@ require ( github.com/yuin/goldmark-emoji v1.0.1 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect + google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/go.sum b/go.sum index f24d36e0a..4b79794c5 100644 --- a/go.sum +++ b/go.sum @@ -485,6 +485,7 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 h1:PDIOdWxZ8eRizhKa1AAvY53xsvLB1cWorMjslvY3VA8= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -497,7 +498,10 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -511,6 +515,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go new file mode 100644 index 000000000..ddc50bb13 --- /dev/null +++ b/pkg/grpc/client.go @@ -0,0 +1,76 @@ +package grpc + +// gRPC client implementation to be able to connect to the gRPC server and perform the following operations: +// - Start a remote JupyterLab server + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/cli/cli/v2/pkg/grpc/jupyter" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" +) + +const ( + requestTimeout = 30 * time.Second +) + +type GrpcClient struct { + conn *grpc.ClientConn + token string + jupyterClient jupyter.JupyterServerHostClient +} + +func New() *GrpcClient { + return &GrpcClient{} +} + +// Connects to the gRPC server on the given port +func (g *GrpcClient) Connect(ctx context.Context, port int, token string) error { + // Attempt to connect to the given port + conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + + if err != nil { + return fmt.Errorf("Failed to connect to the internal server on port %d", port) + } + + g.conn = conn + g.token = token + g.jupyterClient = jupyter.NewJupyterServerHostClient(conn) + + return nil +} + +// Appends the authentication token to the gRPC context +func (g *GrpcClient) appendMetadata(ctx context.Context) context.Context { + return metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+g.token) +} + +// Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser +func (g *GrpcClient) GetRunningServer() (int, string, error) { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ctx = g.appendMetadata(ctx) + defer cancel() + + response, err := g.jupyterClient.GetRunningServer(ctx, &jupyter.GetRunningServerRequest{}) + + if err != nil { + return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err) + } + + if !response.Result { + return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message) + } + + port, err := strconv.Atoi(response.Port) + + if err != nil { + return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err) + } + + return port, response.ServerUrl, err +} diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go b/pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go new file mode 100644 index 000000000..c48f3d0cf --- /dev/null +++ b/pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go @@ -0,0 +1,241 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.21.3 +// source: JupyterServerHostService.v1.proto + +package jupyter + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetRunningServerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetRunningServerRequest) Reset() { + *x = GetRunningServerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_JupyterServerHostService_v1_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRunningServerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRunningServerRequest) ProtoMessage() {} + +func (x *GetRunningServerRequest) ProtoReflect() protoreflect.Message { + mi := &file_JupyterServerHostService_v1_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRunningServerRequest.ProtoReflect.Descriptor instead. +func (*GetRunningServerRequest) Descriptor() ([]byte, []int) { + return file_JupyterServerHostService_v1_proto_rawDescGZIP(), []int{0} +} + +type GetRunningServerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Result bool `protobuf:"varint,1,opt,name=Result,proto3" json:"Result,omitempty"` + Message string `protobuf:"bytes,2,opt,name=Message,proto3" json:"Message,omitempty"` + Port string `protobuf:"bytes,3,opt,name=Port,proto3" json:"Port,omitempty"` + ServerUrl string `protobuf:"bytes,4,opt,name=ServerUrl,proto3" json:"ServerUrl,omitempty"` +} + +func (x *GetRunningServerResponse) Reset() { + *x = GetRunningServerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_JupyterServerHostService_v1_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRunningServerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRunningServerResponse) ProtoMessage() {} + +func (x *GetRunningServerResponse) ProtoReflect() protoreflect.Message { + mi := &file_JupyterServerHostService_v1_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRunningServerResponse.ProtoReflect.Descriptor instead. +func (*GetRunningServerResponse) Descriptor() ([]byte, []int) { + return file_JupyterServerHostService_v1_proto_rawDescGZIP(), []int{1} +} + +func (x *GetRunningServerResponse) GetResult() bool { + if x != nil { + return x.Result + } + return false +} + +func (x *GetRunningServerResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *GetRunningServerResponse) GetPort() string { + if x != nil { + return x.Port + } + return "" +} + +func (x *GetRunningServerResponse) GetServerUrl() string { + if x != nil { + return x.ServerUrl + } + return "" +} + +var File_JupyterServerHostService_v1_proto protoreflect.FileDescriptor + +var file_JupyterServerHostService_v1_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x4a, 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x48, + 0x6f, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x2b, 0x43, 0x6f, 0x64, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x73, 0x2e, + 0x47, 0x72, 0x70, 0x63, 0x2e, 0x4a, 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x22, 0x19, 0x0a, 0x17, 0x47, 0x65, 0x74, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x7e, 0x0a, 0x18, 0x47, + 0x65, 0x74, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, + 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x50, 0x6f, 0x72, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1c, 0x0a, + 0x09, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x32, 0xb5, 0x01, 0x0a, 0x11, + 0x4a, 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x48, 0x6f, 0x73, + 0x74, 0x12, 0x9f, 0x01, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x44, 0x2e, 0x43, 0x6f, 0x64, 0x65, 0x73, 0x70, 0x61, + 0x63, 0x65, 0x73, 0x2e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x4a, 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x75, 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x45, 0x2e, 0x43, + 0x6f, 0x64, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x73, 0x2e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x4a, + 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x48, 0x6f, 0x73, 0x74, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x75, + 0x6e, 0x6e, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6a, 0x75, 0x70, 0x79, 0x74, 0x65, 0x72, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_JupyterServerHostService_v1_proto_rawDescOnce sync.Once + file_JupyterServerHostService_v1_proto_rawDescData = file_JupyterServerHostService_v1_proto_rawDesc +) + +func file_JupyterServerHostService_v1_proto_rawDescGZIP() []byte { + file_JupyterServerHostService_v1_proto_rawDescOnce.Do(func() { + file_JupyterServerHostService_v1_proto_rawDescData = protoimpl.X.CompressGZIP(file_JupyterServerHostService_v1_proto_rawDescData) + }) + return file_JupyterServerHostService_v1_proto_rawDescData +} + +var file_JupyterServerHostService_v1_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_JupyterServerHostService_v1_proto_goTypes = []interface{}{ + (*GetRunningServerRequest)(nil), // 0: Codespaces.Grpc.JupyterServerHostService.v1.GetRunningServerRequest + (*GetRunningServerResponse)(nil), // 1: Codespaces.Grpc.JupyterServerHostService.v1.GetRunningServerResponse +} +var file_JupyterServerHostService_v1_proto_depIdxs = []int32{ + 0, // 0: Codespaces.Grpc.JupyterServerHostService.v1.JupyterServerHost.GetRunningServer:input_type -> Codespaces.Grpc.JupyterServerHostService.v1.GetRunningServerRequest + 1, // 1: Codespaces.Grpc.JupyterServerHostService.v1.JupyterServerHost.GetRunningServer:output_type -> Codespaces.Grpc.JupyterServerHostService.v1.GetRunningServerResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_JupyterServerHostService_v1_proto_init() } +func file_JupyterServerHostService_v1_proto_init() { + if File_JupyterServerHostService_v1_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_JupyterServerHostService_v1_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRunningServerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_JupyterServerHostService_v1_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRunningServerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_JupyterServerHostService_v1_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_JupyterServerHostService_v1_proto_goTypes, + DependencyIndexes: file_JupyterServerHostService_v1_proto_depIdxs, + MessageInfos: file_JupyterServerHostService_v1_proto_msgTypes, + }.Build() + File_JupyterServerHostService_v1_proto = out.File + file_JupyterServerHostService_v1_proto_rawDesc = nil + file_JupyterServerHostService_v1_proto_goTypes = nil + file_JupyterServerHostService_v1_proto_depIdxs = nil +} diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1.proto b/pkg/grpc/jupyter/JupyterServerHostService.v1.proto new file mode 100644 index 000000000..337e7cf41 --- /dev/null +++ b/pkg/grpc/jupyter/JupyterServerHostService.v1.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "./jupyter"; + +package Codespaces.Grpc.JupyterServerHostService.v1; + +service JupyterServerHost { + rpc GetRunningServer (GetRunningServerRequest) returns (GetRunningServerResponse); +} + +message GetRunningServerRequest { +} + +message GetRunningServerResponse { + bool Result = 1; + string Message = 2; + string Port = 3; + string ServerUrl = 4; +} diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go b/pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go new file mode 100644 index 000000000..8ba53014d --- /dev/null +++ b/pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.3 +// source: JupyterServerHostService.v1.proto + +package jupyter + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// JupyterServerHostClient is the client API for JupyterServerHost service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type JupyterServerHostClient interface { + GetRunningServer(ctx context.Context, in *GetRunningServerRequest, opts ...grpc.CallOption) (*GetRunningServerResponse, error) +} + +type jupyterServerHostClient struct { + cc grpc.ClientConnInterface +} + +func NewJupyterServerHostClient(cc grpc.ClientConnInterface) JupyterServerHostClient { + return &jupyterServerHostClient{cc} +} + +func (c *jupyterServerHostClient) GetRunningServer(ctx context.Context, in *GetRunningServerRequest, opts ...grpc.CallOption) (*GetRunningServerResponse, error) { + out := new(GetRunningServerResponse) + err := c.cc.Invoke(ctx, "/Codespaces.Grpc.JupyterServerHostService.v1.JupyterServerHost/GetRunningServer", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// JupyterServerHostServer is the server API for JupyterServerHost service. +// All implementations must embed UnimplementedJupyterServerHostServer +// for forward compatibility +type JupyterServerHostServer interface { + GetRunningServer(context.Context, *GetRunningServerRequest) (*GetRunningServerResponse, error) + mustEmbedUnimplementedJupyterServerHostServer() +} + +// UnimplementedJupyterServerHostServer must be embedded to have forward compatible implementations. +type UnimplementedJupyterServerHostServer struct { +} + +func (UnimplementedJupyterServerHostServer) GetRunningServer(context.Context, *GetRunningServerRequest) (*GetRunningServerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetRunningServer not implemented") +} +func (UnimplementedJupyterServerHostServer) mustEmbedUnimplementedJupyterServerHostServer() {} + +// UnsafeJupyterServerHostServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to JupyterServerHostServer will +// result in compilation errors. +type UnsafeJupyterServerHostServer interface { + mustEmbedUnimplementedJupyterServerHostServer() +} + +func RegisterJupyterServerHostServer(s grpc.ServiceRegistrar, srv JupyterServerHostServer) { + s.RegisterService(&JupyterServerHost_ServiceDesc, srv) +} + +func _JupyterServerHost_GetRunningServer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRunningServerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(JupyterServerHostServer).GetRunningServer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Codespaces.Grpc.JupyterServerHostService.v1.JupyterServerHost/GetRunningServer", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(JupyterServerHostServer).GetRunningServer(ctx, req.(*GetRunningServerRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// JupyterServerHost_ServiceDesc is the grpc.ServiceDesc for JupyterServerHost service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var JupyterServerHost_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Codespaces.Grpc.JupyterServerHostService.v1.JupyterServerHost", + HandlerType: (*JupyterServerHostServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetRunningServer", + Handler: _JupyterServerHost_GetRunningServer_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "JupyterServerHostService.v1.proto", +} diff --git a/pkg/liveshare/client.go b/pkg/liveshare/client.go index 570e8615c..2fff9c766 100644 --- a/pkg/liveshare/client.go +++ b/pkg/liveshare/client.go @@ -15,13 +15,20 @@ import ( "crypto/tls" "errors" "fmt" + "net" "net/url" "strings" "time" + "github.com/cli/cli/v2/pkg/grpc" "github.com/opentracing/opentracing-go" ) +const ( + codespacesInternalPort = 16634 + codespacesInternalSessionName = "CodespacesInternal" +) + type logger interface { Println(v ...interface{}) Printf(f string, v ...interface{}) @@ -112,15 +119,49 @@ func Connect(ctx context.Context, opts Options) (*Session, error) { s := &Session{ ssh: ssh, rpc: rpc, + grpc: grpc.New(), clientName: opts.ClientName, keepAliveReason: make(chan string, 1), logger: opts.Logger, } go s.heartbeat(ctx, 1*time.Minute) + // Connect to the gRPC server so we can make requests anywhere we have access to the session + s.connectToGrpcServer(ctx, opts.SessionToken) + return s, nil } +// Connects to the gRPC server running on the host VM +func (s *Session) connectToGrpcServer(ctx context.Context, token string) error { + listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) + if err != nil { + return err + } + + // Tunnel the remote gRPC server port to the local port + localGrpcServerPort := listen.Addr().(*net.TCPAddr).Port + internalTunnelClosed := make(chan error, 1) + go func() { + fwd := NewPortForwarder(s, codespacesInternalSessionName, codespacesInternalPort, true) + internalTunnelClosed <- fwd.ForwardToListener(ctx, listen) + }() + + // Make a connection to the gRPC server + err = s.grpc.Connect(ctx, localGrpcServerPort, token) + + if err != nil { + return err + } + + select { + case err := <-internalTunnelClosed: + return fmt.Errorf("internal tunnel closed: %w", err) + default: + return nil // success + } +} + type clientCapabilities struct { IsNonInteractive bool `json:"isNonInteractive"` } diff --git a/pkg/liveshare/session.go b/pkg/liveshare/session.go index f912fa5ea..b1c5ab7ca 100644 --- a/pkg/liveshare/session.go +++ b/pkg/liveshare/session.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/cli/cli/v2/pkg/grpc" "github.com/opentracing/opentracing-go" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" @@ -21,8 +22,9 @@ type ChannelID struct { // A Session represents the session between a connected Live Share client and server. type Session struct { - ssh *sshSession - rpc *rpcClient + ssh *sshSession + rpc *rpcClient + grpc *grpc.GrpcClient clientName string keepAliveReason chan string @@ -100,27 +102,7 @@ func (s *Session) StartSSHServerWithOptions(ctx context.Context, options StartSS // StartJupyterServer starts a Juypyter server in the container and returns // the port on which it listens and the server URL. func (s *Session) StartJupyterServer(ctx context.Context) (int, string, error) { - var response struct { - Result bool `json:"result"` - Message string `json:"message"` - Port string `json:"port"` - ServerUrl string `json:"serverUrl"` - } - - if err := s.rpc.do(ctx, "IJupyterServerHostService.getRunningServer", []string{}, &response); err != nil { - return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err) - } - - if !response.Result { - return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message) - } - - port, err := strconv.Atoi(response.Port) - if err != nil { - return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err) - } - - return port, response.ServerUrl, nil + return s.grpc.GetRunningServer() } // heartbeat runs until context cancellation, periodically checking whether there is a From 93f033fe8763a1660ff696061cd66bbfe4bf092f Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Tue, 27 Sep 2022 16:01:10 -0700 Subject: [PATCH 02/14] Address comments --- {pkg => internal/codespaces}/grpc/client.go | 40 ++++++++++++------- .../jupyter/JupyterServerHostService.v1.pb.go | 0 .../jupyter/JupyterServerHostService.v1.proto | 0 .../JupyterServerHostService.v1_grpc.pb.go | 0 pkg/liveshare/client.go | 15 ++++--- pkg/liveshare/session.go | 11 +++-- 6 files changed, 43 insertions(+), 23 deletions(-) rename {pkg => internal/codespaces}/grpc/client.go (56%) rename {pkg => internal/codespaces}/grpc/jupyter/JupyterServerHostService.v1.pb.go (100%) rename {pkg => internal/codespaces}/grpc/jupyter/JupyterServerHostService.v1.proto (100%) rename {pkg => internal/codespaces}/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go (100%) diff --git a/pkg/grpc/client.go b/internal/codespaces/grpc/client.go similarity index 56% rename from pkg/grpc/client.go rename to internal/codespaces/grpc/client.go index ddc50bb13..4ae5b35ed 100644 --- a/pkg/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -6,58 +6,71 @@ package grpc import ( "context" "fmt" + "net" "strconv" "time" - "github.com/cli/cli/v2/pkg/grpc/jupyter" + "github.com/cli/cli/v2/internal/codespaces/grpc/jupyter" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" ) const ( - requestTimeout = 30 * time.Second + connectionTimeout = 5 * time.Second + requestTimeout = 30 * time.Second ) -type GrpcClient struct { +type Client struct { conn *grpc.ClientConn token string + listener net.Listener jupyterClient jupyter.JupyterServerHostClient } -func New() *GrpcClient { - return &GrpcClient{} +func NewClient() *Client { + return &Client{} } // Connects to the gRPC server on the given port -func (g *GrpcClient) Connect(ctx context.Context, port int, token string) error { +func (g *Client) Connect(ctx context.Context, listener net.Listener, port int, token string) error { // Attempt to connect to the given port - conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) - + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", port), grpc.WithTimeout(connectionTimeout), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { - return fmt.Errorf("Failed to connect to the internal server on port %d", port) + return err } g.conn = conn g.token = token + g.listener = listener g.jupyterClient = jupyter.NewJupyterServerHostClient(conn) return nil } +// Closes the gRPC connection +func (g *Client) Close() error { + // Closing the local listener effectively closes the gRPC connection + if err := g.listener.Close(); err != nil { + g.conn.Close() // If we fail to close the listener, explicitly close the gRPC connection and ignore any error + return fmt.Errorf("failed to close local tcp port listener: %w", err) + } + + return nil +} + // Appends the authentication token to the gRPC context -func (g *GrpcClient) appendMetadata(ctx context.Context) context.Context { +func (g *Client) appendMetadata(ctx context.Context) context.Context { return metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+g.token) } // Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser -func (g *GrpcClient) GetRunningServer() (int, string, error) { +func (g *Client) StartJupyterServer() (port int, serverUrl string, err error) { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx = g.appendMetadata(ctx) defer cancel() response, err := g.jupyterClient.GetRunningServer(ctx, &jupyter.GetRunningServerRequest{}) - if err != nil { return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err) } @@ -66,8 +79,7 @@ func (g *GrpcClient) GetRunningServer() (int, string, error) { return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message) } - port, err := strconv.Atoi(response.Port) - + port, err = strconv.Atoi(response.Port) if err != nil { return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err) } diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go b/internal/codespaces/grpc/jupyter/JupyterServerHostService.v1.pb.go similarity index 100% rename from pkg/grpc/jupyter/JupyterServerHostService.v1.pb.go rename to internal/codespaces/grpc/jupyter/JupyterServerHostService.v1.pb.go diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1.proto b/internal/codespaces/grpc/jupyter/JupyterServerHostService.v1.proto similarity index 100% rename from pkg/grpc/jupyter/JupyterServerHostService.v1.proto rename to internal/codespaces/grpc/jupyter/JupyterServerHostService.v1.proto diff --git a/pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go b/internal/codespaces/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go similarity index 100% rename from pkg/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go rename to internal/codespaces/grpc/jupyter/JupyterServerHostService.v1_grpc.pb.go diff --git a/pkg/liveshare/client.go b/pkg/liveshare/client.go index 2fff9c766..5be3b1870 100644 --- a/pkg/liveshare/client.go +++ b/pkg/liveshare/client.go @@ -20,7 +20,7 @@ import ( "strings" "time" - "github.com/cli/cli/v2/pkg/grpc" + "github.com/cli/cli/v2/internal/codespaces/grpc" "github.com/opentracing/opentracing-go" ) @@ -119,7 +119,7 @@ func Connect(ctx context.Context, opts Options) (*Session, error) { s := &Session{ ssh: ssh, rpc: rpc, - grpc: grpc.New(), + grpc: grpc.NewClient(), clientName: opts.ClientName, keepAliveReason: make(chan string, 1), logger: opts.Logger, @@ -127,7 +127,10 @@ func Connect(ctx context.Context, opts Options) (*Session, error) { go s.heartbeat(ctx, 1*time.Minute) // Connect to the gRPC server so we can make requests anywhere we have access to the session - s.connectToGrpcServer(ctx, opts.SessionToken) + err = s.connectToGrpcServer(ctx, opts.SessionToken) + if err != nil { + return nil, fmt.Errorf("error connecting to internal server: %w", err) + } return s, nil } @@ -136,7 +139,7 @@ func Connect(ctx context.Context, opts Options) (*Session, error) { func (s *Session) connectToGrpcServer(ctx context.Context, token string) error { listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) if err != nil { - return err + return fmt.Errorf("failed to listen to local port over tcp: %w", err) } // Tunnel the remote gRPC server port to the local port @@ -148,10 +151,10 @@ func (s *Session) connectToGrpcServer(ctx context.Context, token string) error { }() // Make a connection to the gRPC server - err = s.grpc.Connect(ctx, localGrpcServerPort, token) + err = s.grpc.Connect(ctx, listen, localGrpcServerPort, token) if err != nil { - return err + return fmt.Errorf("failed to establish connection on port %d: %w", localGrpcServerPort, err) } select { diff --git a/pkg/liveshare/session.go b/pkg/liveshare/session.go index b1c5ab7ca..8207c8ba9 100644 --- a/pkg/liveshare/session.go +++ b/pkg/liveshare/session.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/cli/cli/v2/pkg/grpc" + "github.com/cli/cli/v2/internal/codespaces/grpc" "github.com/opentracing/opentracing-go" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" @@ -24,7 +24,7 @@ type ChannelID struct { type Session struct { ssh *sshSession rpc *rpcClient - grpc *grpc.GrpcClient + grpc *grpc.Client clientName string keepAliveReason chan string @@ -45,6 +45,11 @@ func (s *Session) Close() error { return fmt.Errorf("error while closing Live Share session: %w", err) } + // Close the connection to the gRPC server + if err := s.grpc.Close(); err != nil { + return fmt.Errorf("error while closing internal server connection: %w", err) + } + return nil } @@ -102,7 +107,7 @@ func (s *Session) StartSSHServerWithOptions(ctx context.Context, options StartSS // StartJupyterServer starts a Juypyter server in the container and returns // the port on which it listens and the server URL. func (s *Session) StartJupyterServer(ctx context.Context) (int, string, error) { - return s.grpc.GetRunningServer() + return s.grpc.StartJupyterServer() } // heartbeat runs until context cancellation, periodically checking whether there is a From 3f34d6d95e6e5e5a914dac13dcf0829a9fd2fce1 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Wed, 28 Sep 2022 10:39:26 -0700 Subject: [PATCH 03/14] Make suggested gRPC client changes --- internal/codespaces/grpc/client.go | 46 +++++++++++++++++++++++------- pkg/cmd/codespace/jupyter.go | 9 +++++- pkg/liveshare/client.go | 44 ---------------------------- pkg/liveshare/session.go | 33 ++++++++++++++------- 4 files changed, 66 insertions(+), 66 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 4ae5b35ed..860893f68 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -11,6 +11,8 @@ import ( "time" "github.com/cli/cli/v2/internal/codespaces/grpc/jupyter" + "github.com/cli/cli/v2/pkg/liveshare" + "golang.org/x/crypto/ssh" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" @@ -21,6 +23,11 @@ const ( requestTimeout = 30 * time.Second ) +const ( + codespacesInternalPort = 16634 + codespacesInternalSessionName = "CodespacesInternal" +) + type Client struct { conn *grpc.ClientConn token string @@ -28,24 +35,41 @@ type Client struct { jupyterClient jupyter.JupyterServerHostClient } -func NewClient() *Client { - return &Client{} +type liveshareSession interface { + KeepAlive(string) + OpenStreamingChannel(context.Context, liveshare.ChannelID) (ssh.Channel, error) + StartSharing(context.Context, string, int) (liveshare.ChannelID, error) } // Connects to the gRPC server on the given port -func (g *Client) Connect(ctx context.Context, listener net.Listener, port int, token string) error { - // Attempt to connect to the given port - conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", port), grpc.WithTimeout(connectionTimeout), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) +func Connect(ctx context.Context, session liveshareSession, token string) (*Client, error) { + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) if err != nil { - return err + return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err) } - g.conn = conn - g.token = token - g.listener = listener - g.jupyterClient = jupyter.NewJupyterServerHostClient(conn) + // Tunnel the remote gRPC server port to the local port + localGrpcServerPort := listener.Addr().(*net.TCPAddr).Port + internalTunnelClosed := make(chan error, 1) + go func() { + fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) + internalTunnelClosed <- fwd.ForwardToListener(ctx, listener) + }() - return nil + // Attempt to connect to the given port + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", localGrpcServerPort), grpc.WithTimeout(connectionTimeout), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if err != nil { + return nil, err + } + + g := &Client{ + conn: conn, + token: token, + listener: listener, + jupyterClient: jupyter.NewJupyterServerHostClient(conn), + } + + return g, nil } // Closes the gRPC connection diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index 77bedb301..bdeb2f4f4 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -6,6 +6,7 @@ import ( "net" "strings" + "github.com/cli/cli/v2/internal/codespaces/grpc" "github.com/cli/cli/v2/pkg/liveshare" "github.com/spf13/cobra" ) @@ -43,8 +44,14 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { } defer safeClose(session, &err) + client, err := grpc.Connect(ctx, session, "") + if err != nil { + return fmt.Errorf("error connecting to internal server: %w", err) + } + defer safeClose(client, &err) + a.StartProgressIndicatorWithLabel("Starting JupyterLab on codespace") - serverPort, serverUrl, err := session.StartJupyterServer(ctx) + serverPort, serverUrl, err := client.StartJupyterServer() a.StopProgressIndicator() if err != nil { return fmt.Errorf("failed to start JupyterLab server: %w", err) diff --git a/pkg/liveshare/client.go b/pkg/liveshare/client.go index 5be3b1870..570e8615c 100644 --- a/pkg/liveshare/client.go +++ b/pkg/liveshare/client.go @@ -15,20 +15,13 @@ import ( "crypto/tls" "errors" "fmt" - "net" "net/url" "strings" "time" - "github.com/cli/cli/v2/internal/codespaces/grpc" "github.com/opentracing/opentracing-go" ) -const ( - codespacesInternalPort = 16634 - codespacesInternalSessionName = "CodespacesInternal" -) - type logger interface { Println(v ...interface{}) Printf(f string, v ...interface{}) @@ -119,52 +112,15 @@ func Connect(ctx context.Context, opts Options) (*Session, error) { s := &Session{ ssh: ssh, rpc: rpc, - grpc: grpc.NewClient(), clientName: opts.ClientName, keepAliveReason: make(chan string, 1), logger: opts.Logger, } go s.heartbeat(ctx, 1*time.Minute) - // Connect to the gRPC server so we can make requests anywhere we have access to the session - err = s.connectToGrpcServer(ctx, opts.SessionToken) - if err != nil { - return nil, fmt.Errorf("error connecting to internal server: %w", err) - } - return s, nil } -// Connects to the gRPC server running on the host VM -func (s *Session) connectToGrpcServer(ctx context.Context, token string) error { - listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) - if err != nil { - return fmt.Errorf("failed to listen to local port over tcp: %w", err) - } - - // Tunnel the remote gRPC server port to the local port - localGrpcServerPort := listen.Addr().(*net.TCPAddr).Port - internalTunnelClosed := make(chan error, 1) - go func() { - fwd := NewPortForwarder(s, codespacesInternalSessionName, codespacesInternalPort, true) - internalTunnelClosed <- fwd.ForwardToListener(ctx, listen) - }() - - // Make a connection to the gRPC server - err = s.grpc.Connect(ctx, listen, localGrpcServerPort, token) - - if err != nil { - return fmt.Errorf("failed to establish connection on port %d: %w", localGrpcServerPort, err) - } - - select { - case err := <-internalTunnelClosed: - return fmt.Errorf("internal tunnel closed: %w", err) - default: - return nil // success - } -} - type clientCapabilities struct { IsNonInteractive bool `json:"isNonInteractive"` } diff --git a/pkg/liveshare/session.go b/pkg/liveshare/session.go index 8207c8ba9..f912fa5ea 100644 --- a/pkg/liveshare/session.go +++ b/pkg/liveshare/session.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/cli/cli/v2/internal/codespaces/grpc" "github.com/opentracing/opentracing-go" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" @@ -22,9 +21,8 @@ type ChannelID struct { // A Session represents the session between a connected Live Share client and server. type Session struct { - ssh *sshSession - rpc *rpcClient - grpc *grpc.Client + ssh *sshSession + rpc *rpcClient clientName string keepAliveReason chan string @@ -45,11 +43,6 @@ func (s *Session) Close() error { return fmt.Errorf("error while closing Live Share session: %w", err) } - // Close the connection to the gRPC server - if err := s.grpc.Close(); err != nil { - return fmt.Errorf("error while closing internal server connection: %w", err) - } - return nil } @@ -107,7 +100,27 @@ func (s *Session) StartSSHServerWithOptions(ctx context.Context, options StartSS // StartJupyterServer starts a Juypyter server in the container and returns // the port on which it listens and the server URL. func (s *Session) StartJupyterServer(ctx context.Context) (int, string, error) { - return s.grpc.StartJupyterServer() + var response struct { + Result bool `json:"result"` + Message string `json:"message"` + Port string `json:"port"` + ServerUrl string `json:"serverUrl"` + } + + if err := s.rpc.do(ctx, "IJupyterServerHostService.getRunningServer", []string{}, &response); err != nil { + return 0, "", fmt.Errorf("failed to invoke JupyterLab RPC: %w", err) + } + + if !response.Result { + return 0, "", fmt.Errorf("failed to start JupyterLab: %s", response.Message) + } + + port, err := strconv.Atoi(response.Port) + if err != nil { + return 0, "", fmt.Errorf("failed to parse JupyterLab port: %w", err) + } + + return port, response.ServerUrl, nil } // heartbeat runs until context cancellation, periodically checking whether there is a From 35d2cf30d0b0cc12119497cc0136f0ee8340a6ad Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Wed, 28 Sep 2022 11:28:35 -0700 Subject: [PATCH 04/14] Send session token --- pkg/cmd/codespace/jupyter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index bdeb2f4f4..b5ce34a9f 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -44,7 +44,7 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { } defer safeClose(session, &err) - client, err := grpc.Connect(ctx, session, "") + client, err := grpc.Connect(ctx, session, codespace.Connection.SessionToken) if err != nil { return fmt.Errorf("error connecting to internal server: %w", err) } From 766e6a2314b176a8000f23284b7368fbcdc573c3 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Thu, 29 Sep 2022 09:58:44 -0700 Subject: [PATCH 05/14] Use existing context --- internal/codespaces/grpc/client.go | 4 ++-- pkg/cmd/codespace/jupyter.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 860893f68..5bfa179ba 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -89,8 +89,8 @@ func (g *Client) appendMetadata(ctx context.Context) context.Context { } // Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser -func (g *Client) StartJupyterServer() (port int, serverUrl string, err error) { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) +func (g *Client) StartJupyterServer(ctx context.Context) (port int, serverUrl string, err error) { + ctx, cancel := context.WithTimeout(ctx, requestTimeout) ctx = g.appendMetadata(ctx) defer cancel() diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index b5ce34a9f..bc1d6d2fe 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -51,7 +51,7 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { defer safeClose(client, &err) a.StartProgressIndicatorWithLabel("Starting JupyterLab on codespace") - serverPort, serverUrl, err := client.StartJupyterServer() + serverPort, serverUrl, err := client.StartJupyterServer(ctx) a.StopProgressIndicator() if err != nil { return fmt.Errorf("failed to start JupyterLab server: %w", err) From d02ff315e42ab129951c912328777ddd997aa7aa Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Thu, 29 Sep 2022 10:27:17 -0700 Subject: [PATCH 06/14] Fix linting errors --- go.sum | 2 -- internal/codespaces/grpc/client.go | 7 ++++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go.sum b/go.sum index 4b79794c5..66d90b82e 100644 --- a/go.sum +++ b/go.sum @@ -498,7 +498,6 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= @@ -513,7 +512,6 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 5bfa179ba..ab7eeca5f 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -57,7 +57,12 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie }() // Attempt to connect to the given port - conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", localGrpcServerPort), grpc.WithTimeout(connectionTimeout), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } + ctx, _ = context.WithTimeout(ctx, connectionTimeout) + conn, err := grpc.DialContext(ctx, fmt.Sprintf("127.0.0.1:%d", localGrpcServerPort), opts...) if err != nil { return nil, err } From f947020fa5b957b89d4f7be200e70bf2f2a47b48 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Fri, 30 Sep 2022 11:55:57 -0700 Subject: [PATCH 07/14] Add grpc mock server + tests --- internal/codespaces/grpc/client.go | 21 ++++++-- internal/codespaces/grpc/client_test.go | 68 +++++++++++++++++++++++++ internal/codespaces/grpc/test/server.go | 52 +++++++++++++++++++ 3 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 internal/codespaces/grpc/client_test.go create mode 100644 internal/codespaces/grpc/test/server.go diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index ab7eeca5f..1c8a632d6 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -41,7 +41,7 @@ type liveshareSession interface { StartSharing(context.Context, string, int) (liveshare.ChannelID, error) } -// Connects to the gRPC server on the given port +// Finds a free port to listen on and creates a new gRPC client that connects to that port func Connect(ctx context.Context, session liveshareSession, token string) (*Client, error) { listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) if err != nil { @@ -49,20 +49,34 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } // Tunnel the remote gRPC server port to the local port - localGrpcServerPort := listener.Addr().(*net.TCPAddr).Port + localPort := listener.Addr().(*net.TCPAddr).Port internalTunnelClosed := make(chan error, 1) go func() { fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) internalTunnelClosed <- fwd.ForwardToListener(ctx, listener) }() + // Create the gRPC client + client, err := NewClient(ctx, session, token, localPort) + if err != nil { + return nil, fmt.Errorf("failed to create client: %w", err) + } + + // Attach the listener so we can close it later + client.listener = listener + + return client, err +} + +// Creates a new gRPC client that connects to the given port +func NewClient(ctx context.Context, session liveshareSession, token string, localPort int) (*Client, error) { // Attempt to connect to the given port opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), } ctx, _ = context.WithTimeout(ctx, connectionTimeout) - conn, err := grpc.DialContext(ctx, fmt.Sprintf("127.0.0.1:%d", localGrpcServerPort), opts...) + conn, err := grpc.DialContext(ctx, fmt.Sprintf("127.0.0.1:%d", localPort), opts...) if err != nil { return nil, err } @@ -70,7 +84,6 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie g := &Client{ conn: conn, token: token, - listener: listener, jupyterClient: jupyter.NewJupyterServerHostClient(conn), } diff --git a/internal/codespaces/grpc/client_test.go b/internal/codespaces/grpc/client_test.go new file mode 100644 index 000000000..03dc7405a --- /dev/null +++ b/internal/codespaces/grpc/client_test.go @@ -0,0 +1,68 @@ +package grpc + +import ( + "context" + "fmt" + "net" + "testing" + + "github.com/cli/cli/v2/internal/codespaces/grpc/test" +) + +func TestMain(m *testing.M) { + // Start the gRPC server in the background + go func() { + err := test.StartServer() + if err != nil { + panic(err) + } + }() + + m.Run() +} + +func connect(t *testing.T) (ctx context.Context, client *Client) { + ctx = context.Background() + client, err := NewClient(ctx, nil, "token", test.ServerPort) + client.listener = &net.TCPListener{} // mock listener so the close function doesn't panic + if err != nil { + t.Fatalf("error connecting to internal server: %v", err) + } + + return ctx, client +} + +// Test that the gRPC client returns the correct port and URL when the JupyterLab server starts successfully +func TestStartJupyterServerSuccess(t *testing.T) { + ctx, client := connect(t) + defer client.Close() + port, url, err := client.StartJupyterServer(ctx) + if err != nil { + t.Fatalf("expected %v, got %v", nil, err) + } + if port != test.JupyterPort { + t.Fatalf("expected %d, got %d", test.JupyterPort, port) + } + if url != test.JupyterServerUrl { + t.Fatalf("expected %s, got %s", test.JupyterServerUrl, url) + } +} + +// Test that the gRPC client returns an error when the JupyterLab server fails to start +func TestStartJupyterServerFailure(t *testing.T) { + ctx, client := connect(t) + defer client.Close() + test.JupyterMessage = "error message" + test.JupyterResult = false + errorMessage := fmt.Sprintf("failed to start JupyterLab: %s", test.JupyterMessage) + port, url, err := client.StartJupyterServer(ctx) + if err.Error() != errorMessage { + t.Fatalf("expected %v, got %v", errorMessage, err) + } + if port != 0 { + t.Fatalf("expected %d, got %d", 0, port) + } + if url != "" { + t.Fatalf("expected %s, got %s", "", url) + } +} diff --git a/internal/codespaces/grpc/test/server.go b/internal/codespaces/grpc/test/server.go new file mode 100644 index 000000000..50608a9fa --- /dev/null +++ b/internal/codespaces/grpc/test/server.go @@ -0,0 +1,52 @@ +package test + +import ( + "context" + "fmt" + "net" + "strconv" + + "github.com/cli/cli/v2/internal/codespaces/grpc/jupyter" + "google.golang.org/grpc" +) + +const ( + ServerPort = 50051 +) + +var ( + JupyterPort = 1234 + JupyterServerUrl = "http://localhost:1234?token=1234" + JupyterMessage = "" + JupyterResult = true +) + +type server struct { + jupyter.UnimplementedJupyterServerHostServer +} + +func (s *server) GetRunningServer(ctx context.Context, in *jupyter.GetRunningServerRequest) (*jupyter.GetRunningServerResponse, error) { + return &jupyter.GetRunningServerResponse{ + Port: strconv.Itoa(JupyterPort), + ServerUrl: JupyterServerUrl, + Message: JupyterMessage, + Result: JupyterResult, + }, nil +} + +// Starts the mock gRPC server listening on port 50051 +func StartServer() error { + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) + if err != nil { + return fmt.Errorf("failed to listen: %v", err) + } + defer listener.Close() + + s := grpc.NewServer() + jupyter.RegisterJupyterServerHostServer(s, &server{}) + if err := s.Serve(listener); err != nil { + return fmt.Errorf("failed to serve: %v", err) + } + + return nil +} From 341fc6c3f7032a23fc1f2d031bbaa83f84462c6e Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Mon, 3 Oct 2022 20:29:31 -0700 Subject: [PATCH 08/14] Mock session/channel for grpc tests --- internal/codespaces/grpc/client.go | 17 +++--------- internal/codespaces/grpc/client_test.go | 11 ++++---- internal/codespaces/grpc/test/channel.go | 34 ++++++++++++++++++++++++ internal/codespaces/grpc/test/session.go | 33 +++++++++++++++++++++++ 4 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 internal/codespaces/grpc/test/channel.go create mode 100644 internal/codespaces/grpc/test/session.go diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 1c8a632d6..7ceeac7d1 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -56,21 +56,9 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie internalTunnelClosed <- fwd.ForwardToListener(ctx, listener) }() - // Create the gRPC client - client, err := NewClient(ctx, session, token, localPort) - if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) - } + time.Sleep(time.Millisecond) - // Attach the listener so we can close it later - client.listener = listener - - return client, err -} - -// Creates a new gRPC client that connects to the given port -func NewClient(ctx context.Context, session liveshareSession, token string, localPort int) (*Client, error) { - // Attempt to connect to the given port + // Attempt to connect to the port opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), @@ -84,6 +72,7 @@ func NewClient(ctx context.Context, session liveshareSession, token string, loca g := &Client{ conn: conn, token: token, + listener: listener, jupyterClient: jupyter.NewJupyterServerHostClient(conn), } diff --git a/internal/codespaces/grpc/client_test.go b/internal/codespaces/grpc/client_test.go index 03dc7405a..0279803ea 100644 --- a/internal/codespaces/grpc/client_test.go +++ b/internal/codespaces/grpc/client_test.go @@ -3,7 +3,6 @@ package grpc import ( "context" "fmt" - "net" "testing" "github.com/cli/cli/v2/internal/codespaces/grpc/test" @@ -22,20 +21,23 @@ func TestMain(m *testing.M) { } func connect(t *testing.T) (ctx context.Context, client *Client) { + t.Helper() ctx = context.Background() - client, err := NewClient(ctx, nil, "token", test.ServerPort) - client.listener = &net.TCPListener{} // mock listener so the close function doesn't panic + client, err := Connect(ctx, &test.Session{}, "token") if err != nil { t.Fatalf("error connecting to internal server: %v", err) } + t.Cleanup(func() { + client.Close() + }) + return ctx, client } // Test that the gRPC client returns the correct port and URL when the JupyterLab server starts successfully func TestStartJupyterServerSuccess(t *testing.T) { ctx, client := connect(t) - defer client.Close() port, url, err := client.StartJupyterServer(ctx) if err != nil { t.Fatalf("expected %v, got %v", nil, err) @@ -51,7 +53,6 @@ func TestStartJupyterServerSuccess(t *testing.T) { // Test that the gRPC client returns an error when the JupyterLab server fails to start func TestStartJupyterServerFailure(t *testing.T) { ctx, client := connect(t) - defer client.Close() test.JupyterMessage = "error message" test.JupyterResult = false errorMessage := fmt.Sprintf("failed to start JupyterLab: %s", test.JupyterMessage) diff --git a/internal/codespaces/grpc/test/channel.go b/internal/codespaces/grpc/test/channel.go new file mode 100644 index 000000000..eef42c4aa --- /dev/null +++ b/internal/codespaces/grpc/test/channel.go @@ -0,0 +1,34 @@ +package test + +import ( + "io" + "net" +) + +type Channel struct { + conn net.Conn +} + +func (c *Channel) Read(data []byte) (int, error) { + return c.conn.Read(data) +} + +func (c *Channel) Write(data []byte) (int, error) { + return c.conn.Write(data) +} + +func (c *Channel) Close() error { + return c.conn.Close() +} + +func (c *Channel) CloseWrite() error { + return nil +} + +func (c *Channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) { + return false, nil +} + +func (c *Channel) Stderr() io.ReadWriter { + return nil +} diff --git a/internal/codespaces/grpc/test/session.go b/internal/codespaces/grpc/test/session.go new file mode 100644 index 000000000..e29027dc6 --- /dev/null +++ b/internal/codespaces/grpc/test/session.go @@ -0,0 +1,33 @@ +package test + +import ( + "context" + "fmt" + "log" + "net" + + "github.com/cli/cli/v2/pkg/liveshare" + "golang.org/x/crypto/ssh" +) + +type Session struct { +} + +func (s *Session) KeepAlive(reason string) { + return +} + +func (s *Session) StartSharing(ctx context.Context, sessionName string, port int) (liveshare.ChannelID, error) { + return liveshare.ChannelID{}, nil +} + +func (s *Session) OpenStreamingChannel(ctx context.Context, id liveshare.ChannelID) (ssh.Channel, error) { + dialer := net.Dialer{} + conn, err := dialer.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) + if err != nil { + log.Fatalf("failed to connect to the grpc server: %v", err) + } + return &Channel{ + conn: conn, + }, nil +} From a090b17e382a7f82630090e335ebe11e2cbf2d31 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Fri, 7 Oct 2022 15:37:04 -0700 Subject: [PATCH 09/14] Ensure port is forwarded and server is shared --- internal/codespaces/grpc/client.go | 7 ++-- internal/codespaces/grpc/test/session.go | 1 + pkg/liveshare/port_forwarder.go | 49 ++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 7ceeac7d1..cf8137006 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -49,14 +49,15 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } // Tunnel the remote gRPC server port to the local port - localPort := listener.Addr().(*net.TCPAddr).Port + localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) internalTunnelClosed := make(chan error, 1) go func() { fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) internalTunnelClosed <- fwd.ForwardToListener(ctx, listener) }() - time.Sleep(time.Millisecond) + // Ping the port to ensure that it is fully forwarded before continuing + liveshare.WaitForPortConnection(ctx, localAddress) // Attempt to connect to the port opts := []grpc.DialOption{ @@ -64,7 +65,7 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie grpc.WithBlock(), } ctx, _ = context.WithTimeout(ctx, connectionTimeout) - conn, err := grpc.DialContext(ctx, fmt.Sprintf("127.0.0.1:%d", localPort), opts...) + conn, err := grpc.DialContext(ctx, localAddress, opts...) if err != nil { return nil, err } diff --git a/internal/codespaces/grpc/test/session.go b/internal/codespaces/grpc/test/session.go index e29027dc6..ec4d69649 100644 --- a/internal/codespaces/grpc/test/session.go +++ b/internal/codespaces/grpc/test/session.go @@ -21,6 +21,7 @@ func (s *Session) StartSharing(ctx context.Context, sessionName string, port int return liveshare.ChannelID{}, nil } +// Creates mock SSH channel connected to the mock gRPC server func (s *Session) OpenStreamingChannel(ctx context.Context, id liveshare.ChannelID) (ssh.Channel, error) { dialer := net.Dialer{} conn, err := dialer.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) diff --git a/pkg/liveshare/port_forwarder.go b/pkg/liveshare/port_forwarder.go index f042eeaea..33dc6d1fa 100644 --- a/pkg/liveshare/port_forwarder.go +++ b/pkg/liveshare/port_forwarder.go @@ -5,9 +5,15 @@ import ( "fmt" "io" "net" + "time" "github.com/opentracing/opentracing-go" "golang.org/x/crypto/ssh" + "golang.org/x/sync/errgroup" +) + +const ( + connectionTimeout = 30 * time.Second ) type portForwardingSession interface { @@ -54,6 +60,12 @@ func (fwd *PortForwarder) ForwardToListener(ctx context.Context, listen net.List return err } + // Ping the port to ensure that it is fully forwarded before continuing + err = WaitForPortConnection(ctx, listen.Addr().String()) + if err != nil { + return err + } + errc := make(chan error, 1) sendError := func(err error) { // Use non-blocking send, to avoid goroutines getting @@ -99,6 +111,43 @@ func (fwd *PortForwarder) Forward(ctx context.Context, conn io.ReadWriteCloser) return awaitError(ctx, errc) } +// Connects to and pings a given address to ensure that the server is shared and the port is forwarded. +func WaitForPortConnection(ctx context.Context, address string) error { + waitCtx, cancel := context.WithTimeout(ctx, connectionTimeout) + g, waitCtx := errgroup.WithContext(waitCtx) + defer cancel() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + g.Go(func() error { + for { + select { + case <-waitCtx.Done(): + return fmt.Errorf("timed out waiting for connection") + case <-ticker.C: + // Verify that the port can be connected to + conn, err := net.Dial("tcp", address) + if err != nil { + continue + } + + defer conn.Close() + + // Send a ping and make sure it succeed + _, err = conn.Write([]byte("ping")) + if err != nil { + continue + } + + return nil + } + } + }) + + return g.Wait() +} + func (fwd *PortForwarder) shareRemotePort(ctx context.Context) (ChannelID, error) { id, err := fwd.session.StartSharing(ctx, fwd.name, fwd.remotePort) if err != nil { From 0f41ccc472b4b4273973b05851bb2b9b787f296a Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Mon, 10 Oct 2022 11:56:44 -0700 Subject: [PATCH 10/14] Have cilents call port connection function --- internal/codespaces/grpc/client.go | 12 ++++++++---- pkg/liveshare/port_forwarder.go | 19 +++---------------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index cf8137006..ea8399683 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -19,8 +19,9 @@ import ( ) const ( - connectionTimeout = 5 * time.Second - requestTimeout = 30 * time.Second + serverConnectionTimeout = 5 * time.Second + requestTimeout = 30 * time.Second + portConnectionTimeout = 30 * time.Second ) const ( @@ -57,14 +58,17 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie }() // Ping the port to ensure that it is fully forwarded before continuing - liveshare.WaitForPortConnection(ctx, localAddress) + err = liveshare.WaitForPortConnection(ctx, localAddress, portConnectionTimeout) + if err != nil { + return nil, fmt.Errorf("failed to connect to local port: %w", err) + } // Attempt to connect to the port opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), } - ctx, _ = context.WithTimeout(ctx, connectionTimeout) + ctx, _ = context.WithTimeout(ctx, serverConnectionTimeout) conn, err := grpc.DialContext(ctx, localAddress, opts...) if err != nil { return nil, err diff --git a/pkg/liveshare/port_forwarder.go b/pkg/liveshare/port_forwarder.go index 33dc6d1fa..ec23fa3d3 100644 --- a/pkg/liveshare/port_forwarder.go +++ b/pkg/liveshare/port_forwarder.go @@ -12,10 +12,6 @@ import ( "golang.org/x/sync/errgroup" ) -const ( - connectionTimeout = 30 * time.Second -) - type portForwardingSession interface { StartSharing(context.Context, string, int) (ChannelID, error) OpenStreamingChannel(context.Context, ChannelID) (ssh.Channel, error) @@ -60,12 +56,6 @@ func (fwd *PortForwarder) ForwardToListener(ctx context.Context, listen net.List return err } - // Ping the port to ensure that it is fully forwarded before continuing - err = WaitForPortConnection(ctx, listen.Addr().String()) - if err != nil { - return err - } - errc := make(chan error, 1) sendError := func(err error) { // Use non-blocking send, to avoid goroutines getting @@ -112,20 +102,17 @@ func (fwd *PortForwarder) Forward(ctx context.Context, conn io.ReadWriteCloser) } // Connects to and pings a given address to ensure that the server is shared and the port is forwarded. -func WaitForPortConnection(ctx context.Context, address string) error { - waitCtx, cancel := context.WithTimeout(ctx, connectionTimeout) +func WaitForPortConnection(ctx context.Context, address string, timeout time.Duration) error { + waitCtx, cancel := context.WithTimeout(ctx, timeout) g, waitCtx := errgroup.WithContext(waitCtx) defer cancel() - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - g.Go(func() error { for { select { case <-waitCtx.Done(): return fmt.Errorf("timed out waiting for connection") - case <-ticker.C: + default: // Verify that the port can be connected to conn, err := net.Dial("tcp", address) if err != nil { From f89b1b6a453d5af4a7bd81655590e9d4e65c7037 Mon Sep 17 00:00:00 2001 From: David Gardiner Date: Mon, 10 Oct 2022 14:39:12 -0700 Subject: [PATCH 11/14] Create timeout context in caller --- internal/codespaces/grpc/client.go | 7 ++- internal/codespaces/grpc/client_test.go | 2 + internal/codespaces/grpc/test/session.go | 1 - pkg/cmd/codespace/jupyter.go | 2 +- pkg/liveshare/port_forwarder.go | 63 ++++++++++++------------ 5 files changed, 39 insertions(+), 36 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index ea8399683..b4c5a0394 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -58,7 +58,9 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie }() // Ping the port to ensure that it is fully forwarded before continuing - err = liveshare.WaitForPortConnection(ctx, localAddress, portConnectionTimeout) + connctx, cancel := context.WithTimeout(ctx, portConnectionTimeout) + defer cancel() + err = liveshare.WaitForPortConnection(connctx, localAddress) if err != nil { return nil, fmt.Errorf("failed to connect to local port: %w", err) } @@ -68,7 +70,8 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), } - ctx, _ = context.WithTimeout(ctx, serverConnectionTimeout) + ctx, cancel = context.WithTimeout(ctx, serverConnectionTimeout) + defer cancel() conn, err := grpc.DialContext(ctx, localAddress, opts...) if err != nil { return nil, err diff --git a/internal/codespaces/grpc/client_test.go b/internal/codespaces/grpc/client_test.go index 0279803ea..62a7b1723 100644 --- a/internal/codespaces/grpc/client_test.go +++ b/internal/codespaces/grpc/client_test.go @@ -3,6 +3,7 @@ package grpc import ( "context" "fmt" + "os" "testing" "github.com/cli/cli/v2/internal/codespaces/grpc/test" @@ -18,6 +19,7 @@ func TestMain(m *testing.M) { }() m.Run() + os.Exit(0) } func connect(t *testing.T) (ctx context.Context, client *Client) { diff --git a/internal/codespaces/grpc/test/session.go b/internal/codespaces/grpc/test/session.go index ec4d69649..70d81d41e 100644 --- a/internal/codespaces/grpc/test/session.go +++ b/internal/codespaces/grpc/test/session.go @@ -14,7 +14,6 @@ type Session struct { } func (s *Session) KeepAlive(reason string) { - return } func (s *Session) StartSharing(ctx context.Context, sessionName string, port int) (liveshare.ChannelID, error) { diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index bc1d6d2fe..c37a97ea7 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -44,13 +44,13 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { } defer safeClose(session, &err) + a.StartProgressIndicatorWithLabel("Starting JupyterLab on codespace") client, err := grpc.Connect(ctx, session, codespace.Connection.SessionToken) if err != nil { return fmt.Errorf("error connecting to internal server: %w", err) } defer safeClose(client, &err) - a.StartProgressIndicatorWithLabel("Starting JupyterLab on codespace") serverPort, serverUrl, err := client.StartJupyterServer(ctx) a.StopProgressIndicator() if err != nil { diff --git a/pkg/liveshare/port_forwarder.go b/pkg/liveshare/port_forwarder.go index ec23fa3d3..923415c01 100644 --- a/pkg/liveshare/port_forwarder.go +++ b/pkg/liveshare/port_forwarder.go @@ -5,11 +5,9 @@ import ( "fmt" "io" "net" - "time" "github.com/opentracing/opentracing-go" "golang.org/x/crypto/ssh" - "golang.org/x/sync/errgroup" ) type portForwardingSession interface { @@ -101,38 +99,39 @@ func (fwd *PortForwarder) Forward(ctx context.Context, conn io.ReadWriteCloser) return awaitError(ctx, errc) } -// Connects to and pings a given address to ensure that the server is shared and the port is forwarded. -func WaitForPortConnection(ctx context.Context, address string, timeout time.Duration) error { - waitCtx, cancel := context.WithTimeout(ctx, timeout) - g, waitCtx := errgroup.WithContext(waitCtx) - defer cancel() - - g.Go(func() error { - for { - select { - case <-waitCtx.Done(): - return fmt.Errorf("timed out waiting for connection") - default: - // Verify that the port can be connected to - conn, err := net.Dial("tcp", address) - if err != nil { - continue - } - - defer conn.Close() - - // Send a ping and make sure it succeed - _, err = conn.Write([]byte("ping")) - if err != nil { - continue - } - - return nil +// Loops until we can connect to the address or the context is canceled. +func WaitForPortConnection(ctx context.Context, address string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + err := connectToAddr(address) + if err != nil { + continue } - } - }) - return g.Wait() + return nil // success + } + } +} + +// Connects to and pings a given address to ensure that the server is shared and the port is forwarded. +func connectToAddr(address string) error { + // Verify that the port can be connected to + conn, err := net.Dial("tcp", address) + if err != nil { + return err + } + defer conn.Close() + + // Send a ping and make sure it succeed + _, err = conn.Write([]byte("ping")) + if err != nil { + return err + } + + return nil } func (fwd *PortForwarder) shareRemotePort(ctx context.Context) (ChannelID, error) { From e11b43f8f6d01c768784dc594fad5a22b12c6c89 Mon Sep 17 00:00:00 2001 From: Jose Garcia Date: Tue, 11 Oct 2022 08:55:14 -0400 Subject: [PATCH 12/14] Fixes for handling the grpc client lifecyle --- internal/codespaces/grpc/client.go | 65 +++++++++++++++--------- internal/codespaces/grpc/client_test.go | 55 +++++++++++--------- internal/codespaces/grpc/test/server.go | 20 ++++++-- internal/codespaces/grpc/test/session.go | 16 +++--- pkg/cmd/codespace/jupyter.go | 31 +++++++++-- pkg/liveshare/port_forwarder.go | 35 ------------- 6 files changed, 123 insertions(+), 99 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index b4c5a0394..5ced382f8 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -19,9 +19,8 @@ import ( ) const ( - serverConnectionTimeout = 5 * time.Second - requestTimeout = 30 * time.Second - portConnectionTimeout = 30 * time.Second + ConnectionTimeout = 5 * time.Second + RequestTimeout = 30 * time.Second ) const ( @@ -34,6 +33,7 @@ type Client struct { token string listener net.Listener jupyterClient jupyter.JupyterServerHostClient + cancelPF context.CancelFunc } type liveshareSession interface { @@ -49,32 +49,50 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err) } - // Tunnel the remote gRPC server port to the local port - localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) - internalTunnelClosed := make(chan error, 1) - go func() { - fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) - internalTunnelClosed <- fwd.ForwardToListener(ctx, listener) + // Create a cancelable context to be able to cancel background tasks + // if we encounter an error while connecting to the gRPC server + connectctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } }() - // Ping the port to ensure that it is fully forwarded before continuing - connctx, cancel := context.WithTimeout(ctx, portConnectionTimeout) - defer cancel() - err = liveshare.WaitForPortConnection(connctx, localAddress) - if err != nil { - return nil, fmt.Errorf("failed to connect to local port: %w", err) - } + // Ensure we close the port forwarder if we encounter an error + // or once the gRPC connection is closed. pfcancel is retained + // to close the PF whenever we close the gRPC connection. + pfctx, pfcancel := context.WithCancel(connectctx) + + ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine + + // Tunnel the remote gRPC server port to the local port + localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) + go func() { + fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) + ch <- fwd.ForwardToListener(pfctx, listener) + }() // Attempt to connect to the port opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), } - ctx, cancel = context.WithTimeout(ctx, serverConnectionTimeout) - defer cancel() - conn, err := grpc.DialContext(ctx, localAddress, opts...) - if err != nil { - return nil, err + + var conn *grpc.ClientConn + + go func() { + conn, err = grpc.DialContext(connectctx, localAddress, opts...) + ch <- err // nil if we successfully connected + }() + + // Wait for the connection to be established or for the context to be cancelled + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-ch: + if err != nil { + return nil, err + } } g := &Client{ @@ -82,6 +100,7 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie token: token, listener: listener, jupyterClient: jupyter.NewJupyterServerHostClient(conn), + cancelPF: pfcancel, } return g, nil @@ -89,6 +108,8 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie // Closes the gRPC connection func (g *Client) Close() error { + g.cancelPF() + // Closing the local listener effectively closes the gRPC connection if err := g.listener.Close(); err != nil { g.conn.Close() // If we fail to close the listener, explicitly close the gRPC connection and ignore any error @@ -105,9 +126,7 @@ func (g *Client) appendMetadata(ctx context.Context) context.Context { // Starts a remote JupyterLab server to allow the user to connect to the codespace via JupyterLab in their browser func (g *Client) StartJupyterServer(ctx context.Context) (port int, serverUrl string, err error) { - ctx, cancel := context.WithTimeout(ctx, requestTimeout) ctx = g.appendMetadata(ctx) - defer cancel() response, err := g.jupyterClient.GetRunningServer(ctx, &jupyter.GetRunningServerRequest{}) if err != nil { diff --git a/internal/codespaces/grpc/client_test.go b/internal/codespaces/grpc/client_test.go index 62a7b1723..d905c0e29 100644 --- a/internal/codespaces/grpc/client_test.go +++ b/internal/codespaces/grpc/client_test.go @@ -3,29 +3,35 @@ package grpc import ( "context" "fmt" - "os" + "log" "testing" - "github.com/cli/cli/v2/internal/codespaces/grpc/test" + grpctest "github.com/cli/cli/v2/internal/codespaces/grpc/test" ) -func TestMain(m *testing.M) { +func startServer(t *testing.T) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + // Start the gRPC server in the background go func() { - err := test.StartServer() - if err != nil { - panic(err) + err := grpctest.StartServer(ctx) + if err != nil && err != context.Canceled { + log.Println(fmt.Errorf("error starting test server: %v", err)) } }() - m.Run() - os.Exit(0) + // Stop the gRPC server when the test is done + t.Cleanup(func() { + cancel() + }) } -func connect(t *testing.T) (ctx context.Context, client *Client) { +func connect(t *testing.T) (client *Client) { t.Helper() - ctx = context.Background() - client, err := Connect(ctx, &test.Session{}, "token") + + client, err := Connect(context.Background(), &grpctest.Session{}, "token") if err != nil { t.Fatalf("error connecting to internal server: %v", err) } @@ -34,31 +40,34 @@ func connect(t *testing.T) (ctx context.Context, client *Client) { client.Close() }) - return ctx, client + return client } // Test that the gRPC client returns the correct port and URL when the JupyterLab server starts successfully func TestStartJupyterServerSuccess(t *testing.T) { - ctx, client := connect(t) - port, url, err := client.StartJupyterServer(ctx) + startServer(t) + client := connect(t) + + port, url, err := client.StartJupyterServer(context.Background()) if err != nil { t.Fatalf("expected %v, got %v", nil, err) } - if port != test.JupyterPort { - t.Fatalf("expected %d, got %d", test.JupyterPort, port) + if port != grpctest.JupyterPort { + t.Fatalf("expected %d, got %d", grpctest.JupyterPort, port) } - if url != test.JupyterServerUrl { - t.Fatalf("expected %s, got %s", test.JupyterServerUrl, url) + if url != grpctest.JupyterServerUrl { + t.Fatalf("expected %s, got %s", grpctest.JupyterServerUrl, url) } } // Test that the gRPC client returns an error when the JupyterLab server fails to start func TestStartJupyterServerFailure(t *testing.T) { - ctx, client := connect(t) - test.JupyterMessage = "error message" - test.JupyterResult = false - errorMessage := fmt.Sprintf("failed to start JupyterLab: %s", test.JupyterMessage) - port, url, err := client.StartJupyterServer(ctx) + startServer(t) + client := connect(t) + grpctest.JupyterMessage = "error message" + grpctest.JupyterResult = false + errorMessage := fmt.Sprintf("failed to start JupyterLab: %s", grpctest.JupyterMessage) + port, url, err := client.StartJupyterServer(context.Background()) if err.Error() != errorMessage { t.Fatalf("expected %v, got %v", errorMessage, err) } diff --git a/internal/codespaces/grpc/test/server.go b/internal/codespaces/grpc/test/server.go index 50608a9fa..8af5efc29 100644 --- a/internal/codespaces/grpc/test/server.go +++ b/internal/codespaces/grpc/test/server.go @@ -35,7 +35,7 @@ func (s *server) GetRunningServer(ctx context.Context, in *jupyter.GetRunningSer } // Starts the mock gRPC server listening on port 50051 -func StartServer() error { +func StartServer(ctx context.Context) error { listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) if err != nil { return fmt.Errorf("failed to listen: %v", err) @@ -44,9 +44,19 @@ func StartServer() error { s := grpc.NewServer() jupyter.RegisterJupyterServerHostServer(s, &server{}) - if err := s.Serve(listener); err != nil { - return fmt.Errorf("failed to serve: %v", err) - } - return nil + ch := make(chan error, 1) + go func() { + if err := s.Serve(listener); err != nil { + ch <- fmt.Errorf("failed to serve: %v", err) + } + }() + + select { + case <-ctx.Done(): + s.Stop() + return ctx.Err() + case err := <-ch: + return err + } } diff --git a/internal/codespaces/grpc/test/session.go b/internal/codespaces/grpc/test/session.go index 70d81d41e..aba0f17ee 100644 --- a/internal/codespaces/grpc/test/session.go +++ b/internal/codespaces/grpc/test/session.go @@ -3,7 +3,6 @@ package test import ( "context" "fmt" - "log" "net" "github.com/cli/cli/v2/pkg/liveshare" @@ -11,23 +10,22 @@ import ( ) type Session struct { + channel ssh.Channel } func (s *Session) KeepAlive(reason string) { } func (s *Session) StartSharing(ctx context.Context, sessionName string, port int) (liveshare.ChannelID, error) { + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) + if err != nil { + return liveshare.ChannelID{}, err + } + s.channel = &Channel{conn} return liveshare.ChannelID{}, nil } // Creates mock SSH channel connected to the mock gRPC server func (s *Session) OpenStreamingChannel(ctx context.Context, id liveshare.ChannelID) (ssh.Channel, error) { - dialer := net.Dialer{} - conn, err := dialer.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ServerPort)) - if err != nil { - log.Fatalf("failed to connect to the grpc server: %v", err) - } - return &Channel{ - conn: conn, - }, nil + return s.channel, nil } diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index c37a97ea7..928dc8871 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -45,17 +45,17 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { defer safeClose(session, &err) a.StartProgressIndicatorWithLabel("Starting JupyterLab on codespace") - client, err := grpc.Connect(ctx, session, codespace.Connection.SessionToken) + client, err := connectToGRPCServer(ctx, session, codespace.Connection.SessionToken) if err != nil { - return fmt.Errorf("error connecting to internal server: %w", err) + return fmt.Errorf("failed to connect to internal server: %w", err) } defer safeClose(client, &err) - serverPort, serverUrl, err := client.StartJupyterServer(ctx) - a.StopProgressIndicator() + serverPort, serverUrl, err := startJupyterServer(ctx, client) if err != nil { return fmt.Errorf("failed to start JupyterLab server: %w", err) } + a.StopProgressIndicator() // Pass 0 to pick a random port listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", 0)) @@ -87,3 +87,26 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { return nil // success } } + +func connectToGRPCServer(ctx context.Context, session liveshareSession, token string) (*grpc.Client, error) { + ctx, _ = context.WithTimeout(ctx, grpc.ConnectionTimeout) + + client, err := grpc.Connect(ctx, session, token) + if err != nil { + return nil, fmt.Errorf("error connecting to internal server: %w", err) + } + + return client, nil +} + +func startJupyterServer(ctx context.Context, client *grpc.Client) (int, string, error) { + ctx, cancel := context.WithTimeout(ctx, grpc.RequestTimeout) + defer cancel() + + serverPort, serverUrl, err := client.StartJupyterServer(ctx) + if err != nil { + return 0, "", fmt.Errorf("failed to start JupyterLab server: %w", err) + } + + return serverPort, serverUrl, nil +} diff --git a/pkg/liveshare/port_forwarder.go b/pkg/liveshare/port_forwarder.go index 923415c01..f042eeaea 100644 --- a/pkg/liveshare/port_forwarder.go +++ b/pkg/liveshare/port_forwarder.go @@ -99,41 +99,6 @@ func (fwd *PortForwarder) Forward(ctx context.Context, conn io.ReadWriteCloser) return awaitError(ctx, errc) } -// Loops until we can connect to the address or the context is canceled. -func WaitForPortConnection(ctx context.Context, address string) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - err := connectToAddr(address) - if err != nil { - continue - } - - return nil // success - } - } -} - -// Connects to and pings a given address to ensure that the server is shared and the port is forwarded. -func connectToAddr(address string) error { - // Verify that the port can be connected to - conn, err := net.Dial("tcp", address) - if err != nil { - return err - } - defer conn.Close() - - // Send a ping and make sure it succeed - _, err = conn.Write([]byte("ping")) - if err != nil { - return err - } - - return nil -} - func (fwd *PortForwarder) shareRemotePort(ctx context.Context) (ChannelID, error) { id, err := fwd.session.StartSharing(ctx, fwd.name, fwd.remotePort) if err != nil { From 9e13f6ba6b17b1aec11d7e0971e5748e7541925f Mon Sep 17 00:00:00 2001 From: Jose Garcia Date: Tue, 11 Oct 2022 09:14:23 -0400 Subject: [PATCH 13/14] cleanup connect --- internal/codespaces/grpc/client.go | 35 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/internal/codespaces/grpc/client.go b/internal/codespaces/grpc/client.go index 5ced382f8..76119d659 100644 --- a/internal/codespaces/grpc/client.go +++ b/internal/codespaces/grpc/client.go @@ -48,6 +48,12 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie if err != nil { return nil, fmt.Errorf("failed to listen to local port over tcp: %w", err) } + localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) + + client := &Client{ + token: token, + listener: listener, + } // Create a cancelable context to be able to cancel background tasks // if we encounter an error while connecting to the gRPC server @@ -58,29 +64,27 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } }() + ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine + // Ensure we close the port forwarder if we encounter an error // or once the gRPC connection is closed. pfcancel is retained // to close the PF whenever we close the gRPC connection. pfctx, pfcancel := context.WithCancel(connectctx) - - ch := make(chan error, 2) // Buffered channel to ensure we don't block on the goroutine + client.cancelPF = pfcancel // Tunnel the remote gRPC server port to the local port - localAddress := fmt.Sprintf("127.0.0.1:%d", listener.Addr().(*net.TCPAddr).Port) go func() { fwd := liveshare.NewPortForwarder(session, codespacesInternalSessionName, codespacesInternalPort, true) ch <- fwd.ForwardToListener(pfctx, listener) }() - // Attempt to connect to the port - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - } - var conn *grpc.ClientConn - go func() { + // Attempt to connect to the port + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } conn, err = grpc.DialContext(connectctx, localAddress, opts...) ch <- err // nil if we successfully connected }() @@ -95,15 +99,10 @@ func Connect(ctx context.Context, session liveshareSession, token string) (*Clie } } - g := &Client{ - conn: conn, - token: token, - listener: listener, - jupyterClient: jupyter.NewJupyterServerHostClient(conn), - cancelPF: pfcancel, - } + client.conn = conn + client.jupyterClient = jupyter.NewJupyterServerHostClient(conn) - return g, nil + return client, nil } // Closes the gRPC connection From a356a1bef0bfb32546aa775f3cf2cd79b9bf7e6c Mon Sep 17 00:00:00 2001 From: Jose Garcia Date: Tue, 11 Oct 2022 13:15:34 -0400 Subject: [PATCH 14/14] no need to ignore cancel --- pkg/cmd/codespace/jupyter.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/codespace/jupyter.go b/pkg/cmd/codespace/jupyter.go index 928dc8871..5c19c2ab1 100644 --- a/pkg/cmd/codespace/jupyter.go +++ b/pkg/cmd/codespace/jupyter.go @@ -89,7 +89,8 @@ func (a *App) Jupyter(ctx context.Context, codespaceName string) (err error) { } func connectToGRPCServer(ctx context.Context, session liveshareSession, token string) (*grpc.Client, error) { - ctx, _ = context.WithTimeout(ctx, grpc.ConnectionTimeout) + ctx, cancel := context.WithTimeout(ctx, grpc.ConnectionTimeout) + defer cancel() client, err := grpc.Connect(ctx, session, token) if err != nil {