Initial spike with request/event handling

This commit is contained in:
Jose Garcia 2022-02-14 17:22:58 -05:00
parent bf83c660a1
commit 04a4e43dec
5 changed files with 159 additions and 4 deletions

View file

@ -97,6 +97,9 @@ func (fwd *PortForwarder) shareRemotePort(ctx context.Context) (channelID, error
if err != nil {
err = fmt.Errorf("failed to share remote port %d: %w", fwd.remotePort, err)
}
// wait for port change kind start
return id, err
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/opentracing/opentracing-go"
@ -13,15 +14,18 @@ import (
type rpcClient struct {
*jsonrpc2.Conn
conn io.ReadWriteCloser
eventHandlersMu sync.RWMutex
eventHandlers map[string]chan []byte
}
func newRPCClient(conn io.ReadWriteCloser) *rpcClient {
return &rpcClient{conn: conn}
return &rpcClient{conn: conn, eventHandlers: make(map[string]chan []byte)}
}
func (r *rpcClient) connect(ctx context.Context) {
stream := jsonrpc2.NewBufferedStream(r.conn, jsonrpc2.VSCodeObjectCodec{})
r.Conn = jsonrpc2.NewConn(ctx, stream, nullHandler{})
r.Conn = jsonrpc2.NewConn(ctx, stream, newRequestHandler(r))
}
func (r *rpcClient) do(ctx context.Context, method string, args, result interface{}) error {
@ -40,7 +44,43 @@ func (r *rpcClient) do(ctx context.Context, method string, args, result interfac
return waiter.Wait(waitCtx, result)
}
type nullHandler struct{}
func (r *rpcClient) registerEventHandler(eventName string) chan []byte {
r.eventHandlersMu.Lock()
defer r.eventHandlersMu.Unlock()
func (nullHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
if ch, ok := r.eventHandlers[eventName]; ok {
return ch
}
ch := make(chan []byte)
r.eventHandlers[eventName] = ch
return ch
}
func (r *rpcClient) eventHandler(eventName string) chan []byte {
r.eventHandlersMu.RLock()
defer r.eventHandlersMu.RUnlock()
return r.eventHandlers[eventName]
}
type requestHandler struct {
rpcClient *rpcClient
}
func newRequestHandler(rpcClient *rpcClient) *requestHandler {
return &requestHandler{rpcClient: rpcClient}
}
func (e *requestHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
handler := e.rpcClient.eventHandler(req.Method)
if handler == nil {
return // noop
}
select {
case handler <- *req.Params:
default:
// event handler
}
}

55
pkg/liveshare/rpc_test.go Normal file
View file

@ -0,0 +1,55 @@
package liveshare
import (
"context"
"encoding/json"
"fmt"
"net"
"testing"
"github.com/sourcegraph/jsonrpc2"
)
func TestRequestHandler(t *testing.T) {
r, w := net.Pipe()
client := newRPCClient(r)
ctx := context.Background()
client.connect(ctx)
type params struct {
Data string `json:"data"`
}
ev := client.registerEventHandler("testEvent")
done := make(chan error)
go func() {
b := <-ev
var receivedParams params
if err := json.Unmarshal(b, &receivedParams); err != nil {
done <- err
return
}
if receivedParams.Data != "test" {
done <- fmt.Errorf("expected test, got %q", receivedParams.Data)
}
done <- nil
}()
go func() {
codec := jsonrpc2.VSCodeObjectCodec{}
type message struct {
Method string `json:"method"`
Params params `json:"params"`
}
codec.WriteObject(w, message{
Method: "testEvent",
Params: params{"test"},
})
}()
err := <-done
if err != nil {
t.Fatal(err)
}
}

View file

@ -78,6 +78,10 @@ func (s *Session) UpdateSharedServerPrivacy(ctx context.Context, port int, visib
return nil
}
func (s *Session) WaitForEvent(eventName string) chan []byte {
return s.rpc.registerEventHandler(eventName)
}
// StartsSSHServer starts an SSH server in the container, installing sshd if necessary,
// and returns the port on which it listens and the user name clients should provide.
func (s *Session) StartSSHServer(ctx context.Context) (int, string, error) {