golang read numerous websockets

Over the last few weeks I have been lurking around Stack Overflow looking for information related to reading numerous websockets. Basically, I have numerous hosts that all emit messages over a websocket and I need to aggregate them.

I've accomplished this with Golang thus far for a single websocket connection. I've accomplished what I am looking for using Python as well, but I would really like to do this in Go!

I have used gorilla's websocket example as well a few others and can successfully read a socket in Go. However, it seems the websocket server does not completely conform to the typical development practice(s) as using a method such as .forEach or .Each in JS; causes handshake failures.

Original Version


    package main

    import (
            "fmt"
            "golang.org/x/net/websocket"
            "log"
    )

    var url = "ws://10.0.1.19:5000/data/websocket"

    func main() {
            ws, err := websocket.Dial(url, "", origin)
            if err != nil {
                    log.Fatal(err)
            }

            var msg = make([]byte, 512)
            _, err = ws.Read(msg)
            if err != nil {
                    log.Fatal(err)
            }
            fmt.Printf("Receive: %s\n", msg)
    }

I dont actually need to send any data to the socket, I simply need to connect and continue to read it and then I'll aggregate that data into a single stream for performing later operations.


Update (2016-03-19)

After continuous changes and testing with both the gorilla and the old x/net/websocket library, I found that unfortunately it seems the websocket server(s) I am connecting to do not properly adhere to the standard that gorilla wants to use for the handshake. Either that or I am not telling gorilla how to properly connect. x/net/websocket connects just fine; I just specify localhost/ as the origin and it seems to work. I am unsure how to tell gorilla how to do the same to see if it works the same way. Digging through the DefaultDialer.Dial() has some configuration options but in my modest Go knowledge now I have not found a way to leverage it to do what I am trying to do.

Current Version (2016-03-19)

package main

import (
    "fmt"
    "golang.org/x/net/websocket"
    // "log"
    "time"
)

var origin = "http://localhost"

type url struct {
    host string
}

func processUrl(host string, messages chan []byte) {
    client, err := websocket.Dial(host, "", origin)
    if err != nil {
        // log.Printf("dial:", err)
    }
    // Clean up on exit from this goroutine
    defer client.Close()
    // Loop reading messages. Send each message to the channel.
    for {
        var msg = make([]byte, 512)
        _, err = client.Read(msg)
        if err != nil {
            // log.Fatal("read:", err)
            return
        }
        messages <- msg
    }
}

func main() {

    // Create an arry of hosts to read websockets from
    urls := []string{
        "ws://10.0.1.90:3000/data/websocket",
        "ws://10.0.2.90:3000/data/websocket",
        "ws://10.0.3.90:3000/data/websocket",
    }

    // Create channel to receive messages from all connections
    messages := make(chan []byte)

    // Run a goroutine for each URL that you want to dial.
    for _, host := range urls {
        go processUrl(host, messages)
    }

    // Print all messages received from the goroutines.
    for msg := range messages {
        fmt.Printf("%d %s\n", time.Now().Unix(), msg)
    }

}

RESPONSE (message from ws):

    {
        "src_city":"Wayne",
        "dest_city":"Amsterdam",
        "src_country":"US",
        "dest_country":"NL",
        "type":"view"
    }

IOWait Issue(s)

One issue I am running in to is IOWait errors. I ran the binary overnight against 10 websockets without any issue. I ran it against al 488 that I need to run it against and it hit IOWait 2 minutes, etc. Some of the routine errors I see:


    goroutine 72 [IO wait]:
    net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005140)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705010, 0x26, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 73 [IO wait, 2 minutes]:
    net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc2080042a0)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705190, 0x25, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 74 [IO wait]:
    net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005200)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x7052d0, 0x27, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

I have another binary that attempts to make an initial connection to each web socket address so that I can make sure it is reachable, however that is another issue all together. My outstanding questions are:

  1. How to use Gorilla's websocket implementation to read from the websocket being served by SockJS as I am doing with x/net/websocket's implementation.
  2. What could or is the likely cause for the IOWait issue(s) I am seeing?
  3. Because I am using []byte as the response, my log file (im just piping stdout to a file) contains lines that have binary data appended to the end. How should/could I go about converting from the byte slice to just writing it as text/string to stdout as to avoid that?
  4. Using the chan, how best is it to pass an additional argument from my process function to return both the host of the websocket the message was returned from to the channel so I can log both the host and the message? Should I be using a struct to define the channel and have the channel contain what I desire: timestamp, host, message?

For the issue of the IOWait errors, I cannot only imagine (a) a connection cannot be established and the routine is holding it open and eventually throwing an error; (b) it is possible I have too many routines running? I have tried with 10, 20, 50, and all 400+ where even a version specifying 10 works (as long as all 10 are responding) and a version where 10 does not work as there is a host not responding.

I will likely have subsequent questions, but I appreciate the insight and help. The channel suggestion definitely got me going. I've used them once before but don't always understand how best to implement them. My other project leverages channels and wait groups (wg), but I honestly don't understand the point of one over the other..

Thanks again, your thoughts and suggestions have been wonderful!

apologies for the weird syntax in this post, I cannot seem to get the editor to remove some of the empty lines around my code elements

Answers


Start a goroutine to read each connection. Send received messages to a channel. Receive from that channel to get messages from all connections.

// Create channel to receive messages from all connections
messages := make(chan []byte)

// Run a goroutine for each URL that you want to dial.
for _, u := range urls {
    go func(u string) {
        // Dial with Gorilla package. The x/net/websocket package has issues.
        c, _, err := websocket.DefaultDialer.Dial(u, http.Header{"Origin":{origin}})
        if err != nil {
            log.Fatal("dial:", err)
        }
        // Clean up on exit from this goroutine
        defer c.Close()
        // Loop reading messages. Send each message to the channel.
        for {
            _, m, err := c.ReadMessage()
            if err != nil {
                log.Fatal("read:", err)
                return
            }
            messages <- m
        }
    }(u)
}

// Print all messages received from the goroutines.
for m := range messages {
    fmt.Printf("%s\n", m)
}

Need Your Help

SQL: changing parameter driving output depending on condition

sql oracle

I'm trying to determine the best approach. In this we have many entities and claims for members of these entities. The claims inherently don't have the idea of the entity, which I'll have a subqu...

Oracle ADF EAR 11.1.1 requires EAR 5.0?

eclipse ear oracle-adf

I am trying to create a new Oracle ADF application in Eclipse, as soon as I type the application name, an error show in the top of the dialog saying :