package main import ( "bufio" "crypto/tls" "encoding/json" "fmt" "io/ioutil" "math/rand" "net" "net/http" "net/url" "os" "strings" "time" "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/gnuflag" "github.com/juju/utils" "github.com/juju/juju/api/base" "github.com/juju/juju/apiserver/params" "github.com/juju/juju/version" ) var agent = gnuflag.String("agent", "machine-0", "set the agent name that we will connect as") var nonce = gnuflag.String("nonce", "user-admin:bootstrap", "machine agents need to pass the machine nonce") var password = gnuflag.String("password", "", "the password for this agent") var host = gnuflag.String("host", "localhost", "the host to connect to") var port = gnuflag.Int("port", 17070, "the apiserver port") var uuid = gnuflag.String("uuid", "", "model-uuid to connect to") type streamConnector struct { modelUUID string host string port int agent string password string nonce string } func (s streamConnector) apiPath(path string) string { return fmt.Sprintf("/model/%s%s", s.modelUUID, path) } func (s streamConnector) Addr() string { return net.JoinHostPort(s.host, fmt.Sprint(s.port)) } func (s streamConnector) tlsConfig() *tls.Config { // Start with good cipher suites and versions baseTLS := utils.SecureTLSConfig() baseTLS.InsecureSkipVerify = true baseTLS.ServerName = "juju-apiserver" return baseTLS } const bufSize = 65536 func (s *streamConnector) ConnectStream(path string, attrs url.Values) (base.Stream, error) { target := url.URL{ Scheme: "wss", Host: s.Addr(), Path: s.apiPath(path), RawQuery: attrs.Encode(), } dialer := &websocket.Dialer{ Proxy: nil, TLSClientConfig: s.tlsConfig(), ReadBufferSize: bufSize, WriteBufferSize: bufSize, } requestHeader := utils.BasicAuthHeader(s.agent, s.password) if s.nonce != "" { requestHeader.Set("X-Juju-Nonce", s.nonce) } connection, err := websocketDial(dialer, target.String(), requestHeader) if err != nil { return nil, errors.Trace(err) } if err := readInitialStreamError(connection); err != nil { connection.Close() return nil, errors.Trace(err) } return connection, nil } // readInitialStreamError reads the initial error response // from a stream connection and returns it. func readInitialStreamError(ws base.Stream) error { // We can use bufio here because the websocket guarantees that a // single read will not read more than a single frame; there is // no guarantee that a single read might not read less than the // whole frame though, so using a single Read call is not // correct. By using ReadSlice rather than ReadBytes, we // guarantee that the error can't be too big (>4096 bytes). messageType, reader, err := ws.NextReader() if err != nil { return errors.Annotate(err, "unable to get reader") } if messageType != websocket.TextMessage { return errors.Errorf("unexpected message type %v", messageType) } line, err := bufio.NewReader(reader).ReadSlice('\n') if err != nil { return errors.Annotate(err, "unable to read initial response") } var errResult params.ErrorResult if err := json.Unmarshal(line, &errResult); err != nil { return errors.Annotate(err, "unable to unmarshal initial response") } if errResult.Error != nil { return errResult.Error } return nil } func websocketDial(dialer *websocket.Dialer, urlStr string, requestHeader http.Header) (base.Stream, error) { c, resp, err := dialer.Dial(urlStr, requestHeader) if err != nil { if err == websocket.ErrBadHandshake { defer resp.Body.Close() body, readErr := ioutil.ReadAll(resp.Body) if readErr != nil { return nil, err } err = errors.Errorf("%s (%s)", strings.TrimSpace(string(body)), http.StatusText(resp.StatusCode), ) } return nil, err } return c, nil } const chars = "abcdefghijklmnopqrstuvwxyz" func main() { gnuflag.Parse(true) connector := &streamConnector{ modelUUID: *uuid, host: *host, port: *port, agent: *agent, password: *password, nonce: *nonce, } type MessageWriter interface { WriteMessage(int, []byte) error } attrs := make(url.Values) attrs.Set("jujuclientversion", version.Current.String()) attrs.Set("version", "1") conn, err := connector.ConnectStream("/logsink", attrs) if err != nil { panic(err) } writer := conn.(MessageWriter) //writer.Close() start := time.Now() rand.Seed(int64(start.Nanosecond() + os.Getpid())) prefix := string(chars[rand.Intn(len(chars))]) + string(chars[rand.Intn(len(chars))]) i := 0 broke := false for time.Since(start).Seconds() < 5.0 { record := ¶ms.LogRecord{ Time: time.Now(), Module: "juju.user", Location: "here", Level: "INFO", Message: fmt.Sprintf("%s this is my fight song %d", prefix, i), Entity: *agent, } asBytes, err := json.Marshal(record) if err != nil { panic(err) } if rand.Intn(100000) < 50 { // intentionally break asBytes asBytes = asBytes[:len(asBytes)-2] broke = true } if err := writer.WriteMessage(websocket.TextMessage, asBytes); err != nil { fmt.Printf("error: %v, broke: %t\n", err, broke) break } i++ time.Sleep(50*time.Microsecond) } duration := time.Since(start).Seconds() fmt.Printf("wrote %d messages in %.3fs (%.3f/s)\n", i, duration, float64(i)/duration) }