Skip to content
Snippets Groups Projects
Unverified Commit b25ec7c9 authored by Kali Kaneko's avatar Kali Kaneko
Browse files

[feat] snowflake client support

parent c7148d95
No related branches found
No related tags found
No related merge requests found
{
"motd": [{
"begin": "01 Nov 21 00:00 -0700",
"end": "31 Jan 22 00:00 -0700",
"type": "daily",
"platform": "all",
"urgency": "normal",
"text": [
{ "lang": "en",
"str": "%20___________%0A%3C%20RiseupVPN%20%3E%0A%20-----------%0A%20%20%20%20%20%20%20%20%5C%20%20%20%5E__%5E%0A%20%20%20%20%20%20%20%20%20%5C%20%20%28oo%29%5C_______%0A%20%20%20%20%20%20%20%20%20%20%20%20%28__%29%5C%20%20%20%20%20%20%20%29%5C%2F%5C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%7C%7C----w%20%7C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%7C%7C%20%20%20%20%20%7C%7C"},
{ "lang": "es",
"str": "¡Gracias por usar RiseupVPN! Por favor reportanos <a href='https://0xacab.org/leap/bitmask-vpn'>cualquier bug o petición</a>."}
]}
]
}
snowflake-client
// Client transport plugin for the Snowflake pluggable transport.
package main
import (
"flag"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
pt "git.torproject.org/pluggable-transports/goptlib.git"
//sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
sf "0xacab.org/leap/bitmask-vpn/pkg/snowflake/lib"
"git.torproject.org/pluggable-transports/snowflake.git/common/nat"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"github.com/pion/webrtc/v3"
)
const (
DefaultSnowflakeCapacity = 1
)
// Accept local SOCKS connections and pass them to the handler.
func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struct{}, wg *sync.WaitGroup) {
defer ln.Close()
for {
conn, err := ln.AcceptSocks()
if err != nil {
if err, ok := err.(net.Error); ok && err.Temporary() {
continue
}
log.Printf("SOCKS accept error: %s", err)
break
}
log.Printf("SOCKS accepted: %v", conn.Req)
go func() {
wg.Add(1)
defer wg.Done()
defer conn.Close()
err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
if err != nil {
log.Printf("conn.Grant error: %s", err)
return
}
handler := make(chan struct{})
go func() {
err = sf.Handler(conn, tongue)
if err != nil {
log.Printf("handler error: %s", err)
}
close(handler)
return
}()
select {
case <-shutdown:
log.Println("Received shutdown signal")
case <-handler:
log.Println("Handler ended")
}
return
}()
}
}
// s is a comma-separated list of ICE server URLs.
func parseIceServers(s string) []webrtc.ICEServer {
var servers []webrtc.ICEServer
s = strings.TrimSpace(s)
if len(s) == 0 {
return nil
}
urls := strings.Split(s, ",")
for _, url := range urls {
url = strings.TrimSpace(url)
servers = append(servers, webrtc.ICEServer{
URLs: []string{url},
})
}
return servers
}
func main() {
iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
brokerURL := flag.String("url", "", "URL of signaling broker")
frontDomain := flag.String("front", "", "front domain")
logFilename := flag.String("log", "", "name of log file")
logToStateDir := flag.Bool("log-to-state-dir", false, "resolve the log file relative to tor's pt state dir")
keepLocalAddresses := flag.Bool("keep-local-addresses", false, "keep local LAN address ICE candidates")
unsafeLogging := flag.Bool("unsafe-logging", false, "prevent logs from being scrubbed")
max := flag.Int("max", DefaultSnowflakeCapacity,
"capacity for number of multiplexed WebRTC peers")
// Deprecated
oldLogToStateDir := flag.Bool("logToStateDir", false, "use -log-to-state-dir instead")
oldKeepLocalAddresses := flag.Bool("keepLocalAddresses", false, "use -keep-local-addresses instead")
flag.Parse()
log.SetFlags(log.LstdFlags | log.LUTC)
// Don't write to stderr; versions of tor earlier than about 0.3.5.6 do
// not read from the pipe, and eventually we will deadlock because the
// buffer is full.
// https://bugs.torproject.org/26360
// https://bugs.torproject.org/25600#comment:14
var logOutput = ioutil.Discard
if *logFilename != "" {
if *logToStateDir || *oldLogToStateDir {
stateDir, err := pt.MakeStateDir()
if err != nil {
log.Fatal(err)
}
*logFilename = filepath.Join(stateDir, *logFilename)
}
logFile, err := os.OpenFile(*logFilename,
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
defer logFile.Close()
logOutput = logFile
}
if *unsafeLogging {
log.SetOutput(logOutput)
} else {
// We want to send the log output through our scrubber first
log.SetOutput(&safelog.LogScrubber{Output: logOutput})
}
log.Println("\n\n\n --- Starting Snowflake Client ---")
iceServers := parseIceServers(*iceServersCommas)
// chooses a random subset of servers from inputs
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(iceServers), func(i, j int) {
iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
})
if len(iceServers) > 2 {
iceServers = iceServers[:(len(iceServers)+1)/2]
}
log.Printf("Using ICE servers:")
for _, server := range iceServers {
log.Printf("url: %v", strings.Join(server.URLs, " "))
}
// Use potentially domain-fronting broker to rendezvous.
broker, err := sf.NewBrokerChannel(
*brokerURL, *frontDomain, sf.CreateBrokerTransport(),
*keepLocalAddresses || *oldKeepLocalAddresses)
if err != nil {
log.Fatalf("parsing broker URL: %v", err)
}
go updateNATType(iceServers, broker)
// Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes
dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
// Begin goptlib client process.
ptInfo, err := pt.ClientSetup(nil)
if err != nil {
log.Fatal(err)
}
if ptInfo.ProxyURL != nil {
pt.ProxyError("proxy is not supported")
os.Exit(1)
}
listeners := make([]net.Listener, 0)
shutdown := make(chan struct{})
var wg sync.WaitGroup
for _, methodName := range ptInfo.MethodNames {
switch methodName {
case "snowflake":
// TODO: Be able to recover when SOCKS dies.
ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
if err != nil {
pt.CmethodError(methodName, err.Error())
break
}
log.Printf("Started SOCKS listener at %v.", ln.Addr())
go socksAcceptLoop(ln, dialer, shutdown, &wg)
pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln)
default:
pt.CmethodError(methodName, "no such method")
}
}
pt.CmethodsDone()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
if os.Getenv("TOR_PT_EXIT_ON_STDIN_CLOSE") == "1" {
// This environment variable means we should treat EOF on stdin
// just like SIGTERM: https://bugs.torproject.org/15435.
go func() {
if _, err := io.Copy(ioutil.Discard, os.Stdin); err != nil {
log.Printf("calling io.Copy(ioutil.Discard, os.Stdin) returned error: %v", err)
}
log.Printf("synthesizing SIGTERM because of stdin close")
sigChan <- syscall.SIGTERM
}()
}
// Wait for a signal.
<-sigChan
log.Println("stopping snowflake")
// Signal received, shut down.
for _, ln := range listeners {
ln.Close()
}
close(shutdown)
wg.Wait()
log.Println("snowflake is done.")
}
// loop through all provided STUN servers until we exhaust the list or find
// one that is compatable with RFC 5780
func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) {
var restrictedNAT bool
var err error
for _, server := range servers {
addr := strings.TrimPrefix(server.URLs[0], "stun:")
restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
if err == nil {
if restrictedNAT {
broker.SetNATType(nat.NATRestricted)
} else {
broker.SetNATType(nat.NATUnrestricted)
}
break
}
}
if err != nil {
broker.SetNATType(nat.NATUnknown)
}
}
package snowflake
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"time"
"0xacab.org/leap/bitmask-vpn/pkg/config"
"github.com/cretz/bine/tor"
)
const torrc = `UseBridges 1
DataDirectory datadir
ClientTransportPlugin snowflake exec /usr/local/bin/snowflake-client \
-url https://snowflake-broker.torproject.net.global.prod.fastly.net/ -front cdn.sstatic.net \
-ice stun:stun.voip.blackberry.com:3478,stun:stun.altar.com.pl:3478,stun:stun.antisip.com:3478,stun:stun.bluesip.net:3478,stun:stun.dus.net:3478,stun:stun.epygi.com:3478,stun:stun.sonetel.com:3478,stun:stun.sonetel.net:3478,stun:stun.stunprotocol.org:3478,stun:stun.uls.co.za:3478,stun:stun.voipgate.com:3478,stun:stun.voys.nl:3478 \
-max 3
Bridge snowflake 0.0.3.0:1`
func writeTorrc() string {
f, err := ioutil.TempFile("", "torrc-snowflake-")
if err != nil {
log.Println(err)
}
f.Write([]byte(torrc))
return f.Name()
}
func BootstrapWithSnowflakeProxies() error {
rcfile := writeTorrc()
conf := &tor.StartConf{DebugWriter: os.Stdout, TorrcFile: rcfile}
fmt.Println("Starting Tor and fetching files to bootstrap VPN tunnel...")
fmt.Println("")
t, err := tor.Start(nil, conf)
if err != nil {
return err
}
defer t.Close()
// Wait at most 5 minutes
dialCtx, dialCancel := context.WithTimeout(context.Background(), time.Minute*10)
defer dialCancel()
dialer, err := t.Dialer(dialCtx, nil)
if err != nil {
return err
}
/*
regClient := &http.Client{
Transport: &http.Transport{
DialContext: dialer.DialContext,
},
Timeout: time.Minute * 5,
}
*/
//fetchFile(regClient, "https://wtfismyip.com/json")
certs := x509.NewCertPool()
certs.AppendCertsFromPEM(config.CaCert)
apiClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certs,
},
DialContext: dialer.DialContext,
},
Timeout: time.Minute * 5,
}
// XXX parametrize these urls
fetchFile(apiClient, "https://api.black.riseup.net/3/config/eip-service.json")
fetchFile(apiClient, "https://api.black.riseup.net/3/cert")
return nil
}
func fetchFile(client *http.Client, uri string) error {
resp, err := client.Get(uri)
if err != nil {
return err
}
defer resp.Body.Close()
c, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println(err)
}
fmt.Println(string(c))
return nil
}
package lib
import (
"net"
)
// Interface for catching Snowflakes. (aka the remote dialer)
type Tongue interface {
Catch() (*WebRTCPeer, error)
// Get the maximum number of snowflakes
GetMax() int
}
// Interface for collecting some number of Snowflakes, for passing along
// ultimately to the SOCKS handler.
type SnowflakeCollector interface {
// Add a Snowflake to the collection.
// Implementation should decide how to connect and maintain the webRTCConn.
Collect() (*WebRTCPeer, error)
// Remove and return the most available Snowflake from the collection.
Pop() *WebRTCPeer
// Signal when the collector has stopped collecting.
Melted() <-chan struct{}
}
// Interface to adapt to goptlib's SocksConn struct.
type SocksConnector interface {
Grant(*net.TCPAddr) error
Reject() error
net.Conn
}
package lib
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"testing"
"git.torproject.org/pluggable-transports/snowflake.git/common/util"
. "github.com/smartystreets/goconvey/convey"
)
type MockTransport struct {
statusOverride int
body []byte
}
// Just returns a response with fake SDP answer.
func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
s := ioutil.NopCloser(bytes.NewReader(m.body))
r := &http.Response{
StatusCode: m.statusOverride,
Body: s,
}
return r, nil
}
type FakeDialer struct {
max int
}
func (w FakeDialer) Catch() (*WebRTCPeer, error) {
fmt.Println("Caught a dummy snowflake.")
return &WebRTCPeer{}, nil
}
func (w FakeDialer) GetMax() int {
return w.max
}
type FakeSocksConn struct {
net.Conn
rejected bool
}
func (f FakeSocksConn) Reject() error {
f.rejected = true
return nil
}
func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
type FakePeers struct{ toRelease *WebRTCPeer }
func (f FakePeers) Collect() (*WebRTCPeer, error) { return &WebRTCPeer{}, nil }
func (f FakePeers) Pop() *WebRTCPeer { return nil }
func (f FakePeers) Melted() <-chan struct{} { return nil }
func TestSnowflakeClient(t *testing.T) {
Convey("Peers", t, func() {
Convey("Can construct", func() {
d := &FakeDialer{max: 1}
p, _ := NewPeers(d)
So(p.Tongue.GetMax(), ShouldEqual, 1)
So(p.snowflakeChan, ShouldNotBeNil)
So(cap(p.snowflakeChan), ShouldEqual, 1)
})
Convey("Collecting a Snowflake requires a Tongue.", func() {
p, err := NewPeers(nil)
So(err, ShouldNotBeNil)
// Set the dialer so that collection is possible.
d := &FakeDialer{max: 1}
p, err = NewPeers(d)
_, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, 1)
// S
_, err = p.Collect()
})
Convey("Collection continues until capacity.", func() {
c := 5
p, _ := NewPeers(FakeDialer{max: c})
// Fill up to capacity.
for i := 0; i < c; i++ {
fmt.Println("Adding snowflake ", i)
_, err := p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, i+1)
}
// But adding another gives an error.
So(p.Count(), ShouldEqual, c)
_, err := p.Collect()
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c)
// But popping and closing allows it to continue.
s := p.Pop()
s.Close()
So(s, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c-1)
_, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, c)
})
Convey("Count correctly purges peers marked for deletion.", func() {
p, _ := NewPeers(FakeDialer{max: 5})
p.Collect()
p.Collect()
p.Collect()
p.Collect()
So(p.Count(), ShouldEqual, 4)
s := p.Pop()
s.Close()
So(p.Count(), ShouldEqual, 3)
s = p.Pop()
s.Close()
So(p.Count(), ShouldEqual, 2)
})
Convey("End Closes all peers.", func() {
cnt := 5
p, _ := NewPeers(FakeDialer{max: cnt})
for i := 0; i < cnt; i++ {
p.activePeers.PushBack(&WebRTCPeer{})
}
So(p.Count(), ShouldEqual, cnt)
p.End()
<-p.Melted()
So(p.Count(), ShouldEqual, 0)
})
Convey("Pop skips over closed peers.", func() {
p, _ := NewPeers(FakeDialer{max: 4})
wc1, _ := p.Collect()
wc2, _ := p.Collect()
wc3, _ := p.Collect()
So(wc1, ShouldNotBeNil)
So(wc2, ShouldNotBeNil)
So(wc3, ShouldNotBeNil)
wc1.Close()
r := p.Pop()
So(p.Count(), ShouldEqual, 2)
So(r, ShouldEqual, wc2)
wc4, _ := p.Collect()
wc2.Close()
wc3.Close()
r = p.Pop()
So(r, ShouldEqual, wc4)
})
})
Convey("Snowflake", t, func() {
SkipConvey("Handler Grants correctly", func() {
socks := &FakeSocksConn{}
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil, 1)
So(socks.rejected, ShouldEqual, false)
Handler(socks, d)
So(socks.rejected, ShouldEqual, true)
})
})
Convey("Dialers", t, func() {
Convey("Can construct WebRTCDialer.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil, 1)
So(d, ShouldNotBeNil)
So(d.BrokerChannel, ShouldNotBeNil)
So(d.BrokerChannel.Host, ShouldEqual, "test")
})
SkipConvey("WebRTCDialer can Catch a snowflake.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil, 1)
conn, err := d.Catch()
So(conn, ShouldBeNil)
So(err, ShouldNotBeNil)
})
})
Convey("Rendezvous", t, func() {
transport := &MockTransport{
http.StatusOK,
[]byte(`{"type":"answer","sdp":"fake"}`),
}
fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
if err != nil {
panic(err)
}
Convey("Construct BrokerChannel with no front domain", func() {
b, err := NewBrokerChannel("test.broker", "", transport, false)
So(b.url, ShouldNotBeNil)
So(err, ShouldBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.transport, ShouldNotBeNil)
})
Convey("Construct BrokerChannel *with* front domain", func() {
b, err := NewBrokerChannel("test.broker", "front", transport, false)
So(b.url, ShouldNotBeNil)
So(err, ShouldBeNil)
So(b.url.Path, ShouldResemble, "test.broker")
So(b.url.Host, ShouldResemble, "front")
So(b.transport, ShouldNotBeNil)
})
Convey("BrokerChannel.Negotiate responds with answer", func() {
b, err := NewBrokerChannel("test.broker", "", transport, false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil)
So(answer, ShouldNotBeNil)
So(answer.SDP, ShouldResemble, "fake")
})
Convey("BrokerChannel.Negotiate fails with 503", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusServiceUnavailable, []byte("\n")},
false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError503)
})
Convey("BrokerChannel.Negotiate fails with 400", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusBadRequest, []byte("\n")},
false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerError400)
})
Convey("BrokerChannel.Negotiate fails with large read", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{http.StatusOK, make([]byte, 100001, 100001)},
false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, "unexpected EOF")
})
Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
b, err := NewBrokerChannel("test.broker", "",
&MockTransport{123, []byte("")}, false)
So(err, ShouldBeNil)
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldNotBeNil)
So(answer, ShouldBeNil)
So(err.Error(), ShouldResemble, BrokerErrorUnexpected)
})
})
}
package lib
import (
"container/list"
"errors"
"fmt"
"log"
"sync"
)
// Container which keeps track of multiple WebRTC remote peers.
// Implements |SnowflakeCollector|.
//
// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
// allows allows rapid recovery when the current WebRTC Peer disconnects.
//
// Note: For now, only one remote can be active at any given moment.
// This is a property of Tor circuits & its current multiplexing constraints,
// but could be updated if that changes.
// (Also, this constraint does not necessarily apply to the more generic PT
// version of Snowflake)
type Peers struct {
Tongue
BytesLogger BytesLogger
snowflakeChan chan *WebRTCPeer
activePeers *list.List
melt chan struct{}
melted bool
collection sync.WaitGroup
}
// Construct a fresh container of remote peers.
func NewPeers(tongue Tongue) (*Peers, error) {
p := &Peers{}
// Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
if tongue == nil {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
p.activePeers = list.New()
p.melt = make(chan struct{})
p.Tongue = tongue
return p, nil
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Collect() (*WebRTCPeer, error) {
// Engage the Snowflake Catching interface, which must be available.
p.collection.Add(1)
defer p.collection.Done()
if p.melted {
return nil, fmt.Errorf("Snowflakes have melted")
}
if nil == p.Tongue {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
cnt := p.Count()
capacity := p.Tongue.GetMax()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
if cnt >= capacity {
return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// BUG: some broker conflict here.
connection, err := p.Tongue.Catch()
if nil != err {
return nil, err
}
// Track new valid Snowflake in internal collection and pass along.
p.activePeers.PushBack(connection)
p.snowflakeChan <- connection
return connection, nil
}
// Pop blocks until an available, valid snowflake appears. Returns nil after End
// has been called.
func (p *Peers) Pop() *WebRTCPeer {
for {
snowflake, ok := <-p.snowflakeChan
if !ok {
return nil
}
if snowflake.closed {
continue
}
// Set to use the same rate-limited traffic logger to keep consistency.
snowflake.BytesLogger = p.BytesLogger
return snowflake
}
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Melted() <-chan struct{} {
return p.melt
}
// Returns total available Snowflakes (including the active one)
// The count only reduces when connections themselves close, rather than when
// they are popped.
func (p *Peers) Count() int {
p.purgeClosedPeers()
return p.activePeers.Len()
}
func (p *Peers) purgeClosedPeers() {
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
// Purge those marked for deletion.
if conn.closed {
p.activePeers.Remove(e)
}
e = next
}
}
// Close all Peers contained here.
func (p *Peers) End() {
close(p.melt)
p.melted = true
p.collection.Wait()
close(p.snowflakeChan)
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
conn.Close()
p.activePeers.Remove(e)
e = next
}
log.Printf("WebRTC: melted all %d snowflakes.", cnt)
}
// WebRTC rendezvous requires the exchange of SessionDescriptions between
// peers in order to establish a PeerConnection.
//
// This file contains the one method currently available to Snowflake:
//
// - Domain-fronted HTTP signaling. The Broker automatically exchange offers
// and answers between this client and some remote WebRTC proxy.
package lib
import (
"bytes"
"errors"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"sync"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/nat"
"git.torproject.org/pluggable-transports/snowflake.git/common/util"
"github.com/pion/webrtc/v3"
)
const (
BrokerError503 string = "No snowflake proxies currently available."
BrokerError400 string = "You sent an invalid offer in the request."
BrokerErrorUnexpected string = "Unexpected error, no answer."
readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
)
// Signalling Channel to the Broker.
type BrokerChannel struct {
// The Host header to put in the HTTP request (optional and may be
// different from the host name in URL).
Host string
url *url.URL
transport http.RoundTripper // Used to make all requests.
keepLocalAddresses bool
NATType string
lock sync.Mutex
}
// We make a copy of DefaultTransport because we want the default Dial
// and TLSHandshakeTimeout settings. But we want to disable the default
// ProxyFromEnvironment setting.
func CreateBrokerTransport() http.RoundTripper {
transport := http.DefaultTransport.(*http.Transport)
transport.Proxy = nil
transport.ResponseHeaderTimeout = 15 * time.Second
return transport
}
// Construct a new BrokerChannel, where:
// |broker| is the full URL of the facilitating program which assigns proxies
// to clients, and |front| is the option fronting domain.
func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) {
targetURL, err := url.Parse(broker)
if err != nil {
return nil, err
}
log.Println("Rendezvous using Broker at:", broker)
bc := new(BrokerChannel)
bc.url = targetURL
if front != "" { // Optional front domain.
log.Println("Domain fronting using:", front)
bc.Host = bc.url.Host
bc.url.Host = front
}
bc.transport = transport
bc.keepLocalAddresses = keepLocalAddresses
bc.NATType = nat.NATUnknown
return bc, nil
}
func limitedRead(r io.Reader, limit int64) ([]byte, error) {
p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1})
if err != nil {
return p, err
} else if int64(len(p)) == limit+1 {
return p[0:limit], io.ErrUnexpectedEOF
}
return p, err
}
// Roundtrip HTTP POST using WebRTC SessionDescriptions.
//
// Send an SDP offer to the broker, which assigns a proxy and responds
// with an SDP answer from a designated remote WebRTC peer.
func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
*webrtc.SessionDescription, error) {
log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
bc.Host, "\nFront URL: ", bc.url.Host)
// Ideally, we could specify an `RTCIceTransportPolicy` that would handle
// this for us. However, "public" was removed from the draft spec.
// See https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration#RTCIceTransportPolicy_enum
if !bc.keepLocalAddresses {
offer = &webrtc.SessionDescription{
Type: offer.Type,
SDP: util.StripLocalAddresses(offer.SDP),
}
}
offerSDP, err := util.SerializeSessionDescription(offer)
if err != nil {
return nil, err
}
data := bytes.NewReader([]byte(offerSDP))
// Suffix with broker's client registration handler.
clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
request, err := http.NewRequest("POST", clientURL.String(), data)
if nil != err {
return nil, err
}
if "" != bc.Host { // Set true host if necessary.
request.Host = bc.Host
}
// include NAT-TYPE
bc.lock.Lock()
request.Header.Set("Snowflake-NAT-TYPE", bc.NATType)
bc.lock.Unlock()
resp, err := bc.transport.RoundTrip(request)
if nil != err {
return nil, err
}
defer resp.Body.Close()
log.Printf("BrokerChannel Response:\n%s\n\n", resp.Status)
switch resp.StatusCode {
case http.StatusOK:
body, err := limitedRead(resp.Body, readLimit)
if nil != err {
return nil, err
}
log.Printf("Received answer: %s", string(body))
return util.DeserializeSessionDescription(string(body))
case http.StatusServiceUnavailable:
return nil, errors.New(BrokerError503)
case http.StatusBadRequest:
return nil, errors.New(BrokerError400)
default:
return nil, errors.New(BrokerErrorUnexpected)
}
}
func (bc *BrokerChannel) SetNATType(NATType string) {
bc.lock.Lock()
bc.NATType = NATType
bc.lock.Unlock()
log.Printf("NAT Type: %s", NATType)
}
// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
type WebRTCDialer struct {
*BrokerChannel
webrtcConfig *webrtc.Configuration
max int
}
func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer {
config := webrtc.Configuration{
ICEServers: iceServers,
}
return &WebRTCDialer{
BrokerChannel: broker,
webrtcConfig: &config,
max: max,
}
}
// Initialize a WebRTC Connection by signaling through the broker.
func (w WebRTCDialer) Catch() (*WebRTCPeer, error) {
// TODO: [#25591] Fetch ICE server information from Broker.
// TODO: [#25596] Consider TURN servers here too.
return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel)
}
// Returns the maximum number of snowflakes to collect
func (w WebRTCDialer) GetMax() int {
return w.max
}
package lib
import (
"context"
"errors"
"io"
"log"
"net"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
"github.com/xtaci/kcp-go/v5"
"github.com/xtaci/smux"
)
const (
ReconnectTimeout = 10 * time.Second
SnowflakeTimeout = 20 * time.Second
// How long to wait for the OnOpen callback on a DataChannel.
DataChannelTimeout = 10 * time.Second
)
type dummyAddr struct{}
func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string { return "dummy" }
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
clientID := turbotunnel.NewClientID()
// We build a persistent KCP session on a sequence of ephemeral WebRTC
// connections. This dialContext tells RedialPacketConn how to get a new
// WebRTC connection when the previous one dies. Inside each WebRTC
// connection, we use EncapsulationPacketConn to encode packets into a
// stream.
dialContext := func(ctx context.Context) (net.PacketConn, error) {
log.Printf("redialing on same connection")
// Obtain an available WebRTC remote. May block.
conn := snowflakes.Pop()
if conn == nil {
return nil, errors.New("handler: Received invalid Snowflake")
}
log.Println("---- Handler: snowflake assigned ----")
// Send the magic Turbo Tunnel token.
_, err := conn.Write(turbotunnel.Token[:])
if err != nil {
return nil, err
}
// Send ClientID prefix.
_, err = conn.Write(clientID[:])
if err != nil {
return nil, err
}
return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
}
pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
// conn is built on the underlying RedialPacketConn—when one WebRTC
// connection dies, another one will be found to take its place. The
// sequence of packets across multiple WebRTC connections drives the KCP
// engine.
conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
if err != nil {
pconn.Close()
return nil, nil, err
}
// Permit coalescing the payloads of consecutive sends.
conn.SetStreamMode(true)
// Set the maximum send and receive window sizes to a high number
// Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026
conn.SetWindowSize(65535, 65535)
// Disable the dynamic congestion window (limit only by the
// maximum of local and remote static windows).
conn.SetNoDelay(
0, // default nodelay
0, // default interval
0, // default resend
1, // nc=1 => congestion window off
)
// On the KCP connection we overlay an smux session and stream.
smuxConfig := smux.DefaultConfig()
smuxConfig.Version = 2
smuxConfig.KeepAliveTimeout = 10 * time.Minute
sess, err := smux.Client(conn, smuxConfig)
if err != nil {
conn.Close()
pconn.Close()
return nil, nil, err
}
return pconn, sess, err
}
// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func Handler(socks net.Conn, tongue Tongue) error {
// Prepare to collect remote WebRTC peers.
snowflakes, err := NewPeers(tongue)
if err != nil {
return err
}
// Use a real logger to periodically output how much traffic is happening.
snowflakes.BytesLogger = NewBytesSyncLogger()
log.Printf("---- Handler: begin collecting snowflakes ---")
go connectLoop(snowflakes)
// Create a new smux session
log.Printf("---- Handler: starting a new session ---")
pconn, sess, err := newSession(snowflakes)
if err != nil {
return err
}
// On the smux session we overlay a stream.
stream, err := sess.OpenStream()
if err != nil {
return err
}
defer stream.Close()
// Begin exchanging data.
log.Printf("---- Handler: begin stream %v ---", stream.ID())
copyLoop(socks, stream)
log.Printf("---- Handler: closed stream %v ---", stream.ID())
snowflakes.End()
log.Printf("---- Handler: end collecting snowflakes ---")
pconn.Close()
sess.Close()
log.Printf("---- Handler: discarding finished session ---")
return nil
}
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
for {
timer := time.After(ReconnectTimeout)
_, err := snowflakes.Collect()
if err != nil {
log.Printf("WebRTC: %v Retrying...", err)
}
select {
case <-timer:
continue
case <-snowflakes.Melted():
log.Println("ConnectLoop: stopped.")
return
}
}
}
// Exchanges bytes between two ReadWriters.
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
done := make(chan struct{}, 2)
go func() {
if _, err := io.Copy(socks, stream); err != nil {
log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
}
done <- struct{}{}
}()
go func() {
if _, err := io.Copy(stream, socks); err != nil {
log.Printf("copying SOCKS to stream resulted in error: %v", err)
}
done <- struct{}{}
}()
<-done
log.Println("copy loop ended")
}
package lib
import (
"bufio"
"errors"
"io"
"net"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
)
var errNotImplemented = errors.New("not implemented")
// EncapsulationPacketConn implements the net.PacketConn interface over an
// io.ReadWriteCloser stream, using the encapsulation package to represent
// packets in a stream.
type EncapsulationPacketConn struct {
io.ReadWriteCloser
localAddr net.Addr
remoteAddr net.Addr
bw *bufio.Writer
}
// NewEncapsulationPacketConn makes
func NewEncapsulationPacketConn(
localAddr, remoteAddr net.Addr,
conn io.ReadWriteCloser,
) *EncapsulationPacketConn {
return &EncapsulationPacketConn{
ReadWriteCloser: conn,
localAddr: localAddr,
remoteAddr: remoteAddr,
bw: bufio.NewWriter(conn),
}
}
// ReadFrom reads an encapsulated packet from the stream.
func (c *EncapsulationPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
data, err := encapsulation.ReadData(c.ReadWriteCloser)
if err != nil {
return 0, c.remoteAddr, err
}
return copy(p, data), c.remoteAddr, nil
}
// WriteTo writes an encapsulated packet to the stream.
func (c *EncapsulationPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
// addr is ignored.
_, err := encapsulation.WriteData(c.bw, p)
if err == nil {
err = c.bw.Flush()
}
if err != nil {
return 0, err
}
return len(p), nil
}
// LocalAddr returns the localAddr value that was passed to
// NewEncapsulationPacketConn.
func (c *EncapsulationPacketConn) LocalAddr() net.Addr {
return c.localAddr
}
func (c *EncapsulationPacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
func (c *EncapsulationPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
func (c *EncapsulationPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
package lib
import (
"log"
"time"
)
const (
LogTimeInterval = 5 * time.Second
)
type BytesLogger interface {
AddOutbound(int)
AddInbound(int)
}
// Default BytesLogger does nothing.
type BytesNullLogger struct{}
func (b BytesNullLogger) AddOutbound(amount int) {}
func (b BytesNullLogger) AddInbound(amount int) {}
// BytesSyncLogger uses channels to safely log from multiple sources with output
// occuring at reasonable intervals.
type BytesSyncLogger struct {
outboundChan chan int
inboundChan chan int
}
// NewBytesSyncLogger returns a new BytesSyncLogger and starts it loggin.
func NewBytesSyncLogger() *BytesSyncLogger {
b := &BytesSyncLogger{
outboundChan: make(chan int, 5),
inboundChan: make(chan int, 5),
}
go b.log()
return b
}
func (b *BytesSyncLogger) log() {
var outbound, inbound, outEvents, inEvents int
ticker := time.NewTicker(LogTimeInterval)
for {
select {
case <-ticker.C:
if outEvents > 0 || inEvents > 0 {
log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
inbound, outbound, inEvents, outEvents)
}
outbound = 0
outEvents = 0
inbound = 0
inEvents = 0
case amount := <-b.outboundChan:
outbound += amount
outEvents++
case amount := <-b.inboundChan:
inbound += amount
inEvents++
}
}
}
func (b *BytesSyncLogger) AddOutbound(amount int) {
b.outboundChan <- amount
}
func (b *BytesSyncLogger) AddInbound(amount int) {
b.inboundChan <- amount
}
package lib
import (
"crypto/rand"
"encoding/hex"
"errors"
"io"
"log"
"sync"
"time"
"github.com/pion/webrtc/v3"
)
// Remote WebRTC peer.
//
// Handles preparation of go-webrtc PeerConnection. Only ever has
// one DataChannel.
type WebRTCPeer struct {
id string
pc *webrtc.PeerConnection
transport *webrtc.DataChannel
recvPipe *io.PipeReader
writePipe *io.PipeWriter
lastReceive time.Time
open chan struct{} // Channel to notify when datachannel opens
closed bool
once sync.Once // Synchronization for PeerConnection destruction
BytesLogger BytesLogger
}
// Construct a WebRTC PeerConnection.
func NewWebRTCPeer(config *webrtc.Configuration,
broker *BrokerChannel) (*WebRTCPeer, error) {
connection := new(WebRTCPeer)
{
var buf [8]byte
if _, err := rand.Read(buf[:]); err != nil {
panic(err)
}
connection.id = "snowflake-" + hex.EncodeToString(buf[:])
}
// Override with something that's not NullLogger to have real logging.
connection.BytesLogger = &BytesNullLogger{}
// Pipes remain the same even when DataChannel gets switched.
connection.recvPipe, connection.writePipe = io.Pipe()
err := connection.connect(config, broker)
if err != nil {
connection.Close()
return nil, err
}
return connection, nil
}
// Read bytes from local SOCKS.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Read(b []byte) (int, error) {
return c.recvPipe.Read(b)
}
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
err := c.transport.Send(b)
if err != nil {
return 0, err
}
c.BytesLogger.AddOutbound(len(b))
return len(b), nil
}
func (c *WebRTCPeer) Close() error {
c.once.Do(func() {
c.closed = true
c.cleanup()
log.Printf("WebRTC: Closing")
})
return nil
}
// Prevent long-lived broken remotes.
// Should also update the DataChannel in underlying go-webrtc's to make Closes
// more immediate / responsive.
func (c *WebRTCPeer) checkForStaleness() {
c.lastReceive = time.Now()
for {
if c.closed {
return
}
if time.Since(c.lastReceive) > SnowflakeTimeout {
log.Printf("WebRTC: No messages received for %v -- closing stale connection.",
SnowflakeTimeout)
c.Close()
return
}
<-time.After(time.Second)
}
}
func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel) error {
log.Println(c.id, " connecting...")
// TODO: When go-webrtc is more stable, it's possible that a new
// PeerConnection won't need to be re-prepared each time.
c.preparePeerConnection(config)
answer, err := broker.Negotiate(c.pc.LocalDescription())
if err != nil {
return err
}
log.Printf("Received Answer.\n")
err = c.pc.SetRemoteDescription(*answer)
if nil != err {
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err
}
// Wait for the datachannel to open or time out
select {
case <-c.open:
case <-time.After(DataChannelTimeout):
c.transport.Close()
return errors.New("timeout waiting for DataChannel.OnOpen")
}
go c.checkForStaleness()
return nil
}
// preparePeerConnection creates a new WebRTC PeerConnection and returns it
// after ICE candidate gathering is complete..
func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error {
var err error
c.pc, err = webrtc.NewPeerConnection(*config)
if err != nil {
log.Printf("NewPeerConnection ERROR: %s", err)
return err
}
ordered := true
dataChannelOptions := &webrtc.DataChannelInit{
Ordered: &ordered,
}
// We must create the data channel before creating an offer
// https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0
dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
if err != nil {
log.Printf("CreateDataChannel ERROR: %s", err)
return err
}
dc.OnOpen(func() {
log.Println("WebRTC: DataChannel.OnOpen")
close(c.open)
})
dc.OnClose(func() {
log.Println("WebRTC: DataChannel.OnClose")
c.Close()
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
if len(msg.Data) <= 0 {
log.Println("0 length message---")
}
n, err := c.writePipe.Write(msg.Data)
c.BytesLogger.AddInbound(n)
if err != nil {
// TODO: Maybe shouldn't actually close.
log.Println("Error writing to SOCKS pipe")
if inerr := c.writePipe.CloseWithError(err); inerr != nil {
log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
}
}
c.lastReceive = time.Now()
})
c.transport = dc
c.open = make(chan struct{})
log.Println("WebRTC: DataChannel created.")
// Allow candidates to accumulate until ICEGatheringStateComplete.
done := webrtc.GatheringCompletePromise(c.pc)
offer, err := c.pc.CreateOffer(nil)
// TODO: Potentially timeout and retry if ICE isn't working.
if err != nil {
log.Println("Failed to prepare offer", err)
c.pc.Close()
return err
}
log.Println("WebRTC: Created offer")
err = c.pc.SetLocalDescription(offer)
if err != nil {
log.Println("Failed to prepare offer", err)
c.pc.Close()
return err
}
log.Println("WebRTC: Set local description")
<-done // Wait for ICE candidate gathering to complete.
log.Println("WebRTC: PeerConnection created.")
return nil
}
// Close all channels and transports
func (c *WebRTCPeer) cleanup() {
// Close this side of the SOCKS pipe.
if c.writePipe != nil { // c.writePipe can be nil in tests.
c.writePipe.Close()
}
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
c.transport.Close()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
err := c.pc.Close()
if nil != err {
log.Printf("Error closing peerconnection...")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment