adding rudp for retransmission

This commit is contained in:
херетик
2023-03-31 12:33:56 +01:00
parent d7d06ddff0
commit 5087b32d73
22 changed files with 1343 additions and 98 deletions

9
go.mod
View File

@@ -22,7 +22,6 @@ require (
github.com/naoina/toml v0.1.1
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.15.0
github.com/templexxx/reedsolomon v1.1.3
github.com/tutorialedge/go-grpc-tutorial v0.0.0-20200509091100-f8d1b5b15b01
go.uber.org/atomic v1.10.0
golang.org/x/crypto v0.6.0
@@ -128,6 +127,7 @@ require (
github.com/kkdai/bstream v1.0.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/cpuid/v2 v2.2.1 // indirect
github.com/klauspost/reedsolomon v1.11.7 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lib/pq v1.10.6 // indirect
@@ -207,13 +207,20 @@ require (
github.com/stretchr/testify v1.8.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/templexxx/cpu v0.0.1 // indirect
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect
github.com/templexxx/reedsolomon v1.1.3 // indirect
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect
github.com/templexxx/xorsimd v0.1.1 // indirect
github.com/thebagchi/sctp-go v1.1.0 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/u35s/rudp v0.0.0-20190524081740-bcc26b6b3828 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/xanzy/ssh-agent v0.2.1 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
github.com/xtaci/kcp-go v4.3.4+incompatible // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.6 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect

16
go.sum
View File

@@ -823,6 +823,8 @@ github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.1 h1:U33DW0aiEj633gHYw3LoDNfkDiYnE5Q8M/TKJn2f2jI=
github.com/klauspost/cpuid/v2 v2.2.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/reedsolomon v1.11.7 h1:9uaHU0slncktTEEg4+7Vl7q7XUNMBUOK4R9gnKhMjAU=
github.com/klauspost/reedsolomon v1.11.7/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -1300,16 +1302,26 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY=
github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk=
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU=
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU=
github.com/templexxx/reedsolomon v1.1.3 h1:UJZtgOAcp8Ldl9Qp9/8YIPHfW58vsUrPHcJYv15Na50=
github.com/templexxx/reedsolomon v1.1.3/go.mod h1:lCyQlNrc8GTWsFE47kSLkJJvsGL8Lo5pfUDPiJZMm3o=
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b h1:fj5tQ8acgNUr6O8LEplsxDhUIe2573iLkJc+PqnzZTI=
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4=
github.com/templexxx/xorsimd v0.1.1 h1:Y4e8YgMx/4xRJO4G6lq0bSswfDCxbIrGu7KqM2ET524=
github.com/templexxx/xorsimd v0.1.1/go.mod h1:W+ffZz8jJMH2SXwuKu9WhygqBMbFnp14G2fqEr8qaNo=
github.com/thebagchi/sctp-go v1.1.0 h1:5Q8eewkJbMBOWNOknB16IR7ASUYhBWKIV/8YP6CnsGA=
github.com/thebagchi/sctp-go v1.1.0/go.mod h1:9pnZDU8hRpB1tJlToJKXZZekmyaxWAJgBnDdP6Gbz5I=
github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho=
github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tutorialedge/go-grpc-tutorial v0.0.0-20200509091100-f8d1b5b15b01 h1:dVcYTEMB9QwRqslGxkvYGXI3fL7twjdbTTADm7J3TU8=
github.com/tutorialedge/go-grpc-tutorial v0.0.0-20200509091100-f8d1b5b15b01/go.mod h1:LBdTyeyzGK4jRGHXkC3z2BFf/Fml8peKISp1OtHDmvs=
github.com/u35s/rudp v0.0.0-20190524081740-bcc26b6b3828 h1:Y9BPl7MuQm8GMA+zqOvWv/JTuJkMyA7ol+QHLtuH330=
github.com/u35s/rudp v0.0.0-20190524081740-bcc26b6b3828/go.mod h1:RoS9SX0MyvunzHkI0fntze/G3apwbDXz12uvz48QA1s=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
@@ -1342,6 +1354,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/xtaci/kcp-go v4.3.4+incompatible h1:T56s9GLhx+KZUn5T8aO2Didfa4uTYvjeVIRLt6uYdhE=
github.com/xtaci/kcp-go v4.3.4+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -1463,6 +1477,7 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
@@ -1561,6 +1576,7 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=

View File

@@ -1,27 +1,22 @@
package pub
import (
"testing"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/prv"
)
func TestBase32(t *testing.T) {
for i := 0; i < 1000; i++ {
var k *prv.Key
var e error
if k, e = prv.GenerateKey(); check(e) {
t.Error(e)
t.FailNow()
}
p := Derive(k)
b32 := p.ToBase32()
log.I.Ln(b32)
var kk *Key
kk, e = FromBase32(b32)
if b32 != kk.ToBase32() {
t.Error(e)
t.FailNow()
}
}
}
//
// func TestBase32(t *testing.T) {
// for i := 0; i < 1000; i++ {
// var k *prv.Key
// var e error
// if k, e = prv.GenerateKey(); check(e) {
// t.Error(e)
// t.FailNow()
// }
// p := Derive(k)
// b32 := p.ToBase32()
// log.I.Ln(b32)
// var kk *Key
// kk, e = FromBase32(b32)
// if b32 != kk.ToBase32() {
// t.Error(e)
// t.FailNow()
// }
// }
// }

View File

@@ -1,29 +0,0 @@
package engine
import (
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
type Pipeline struct {
in ByteChan
Transport
}
func NewPipeline(bufs int) *Pipeline {
return &Pipeline{in: make(ByteChan, bufs)}
}
func (p *Pipeline) Send(b slice.Bytes) {
if p.Transport != nil {
p.Transport.Send(b)
} else {
p.in <- b
}
}
func (p *Pipeline) Receive() <-chan slice.Bytes {
if p.Transport != nil {
return p.Transport.Receive()
} else {
return p.in
}
}

View File

@@ -1,27 +0,0 @@
package engine
import (
"testing"
"github.com/cybriq/qu"
)
func TestNewPipeline(t *testing.T) {
pA := NewPipeline(1)
pB := NewPipeline(1)
pA.Transport = pB
q := qu.T()
go func() {
log.I.F("a->b '%s'", string(<-pB.Receive()))
q.Q()
}()
pA.Send([]byte("testing testing 1 2 3"))
<-q.Wait()
q = qu.T()
go func() {
log.I.F("b->a '%s'", string(<-pA.Receive()))
q.Q()
}()
pB.Send([]byte("testing testing 1 2 3"))
<-q.Wait()
}

20
pkg/engine/tpt_rudp.go Normal file
View File

@@ -0,0 +1,20 @@
package engine
import (
"net/netip"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
type RUDP struct {
endpoint *netip.AddrPort
in, out ByteChan
}
func (k RUDP) Send(b slice.Bytes) {
k.out <- b
}
func (k RUDP) Receive() <-chan slice.Bytes {
return k.in
}

View File

@@ -7,9 +7,11 @@ import (
type ByteChan chan slice.Bytes
func NewSim(bufs int) ByteChan { return make(ByteChan, bufs) }
func (s ByteChan) Send(b slice.Bytes) {
s <- b
}
func (s ByteChan) Receive() <-chan slice.Bytes {
return s
}

View File

@@ -8,12 +8,13 @@ import (
"runtime"
"strings"
"syscall"
"go.uber.org/atomic"
"golang.org/x/sys/unix"
"git-indra.lan/indra-labs/indra"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"github.com/kardianos/osext"
)
@@ -33,7 +34,7 @@ var (
// ch is used to receive SIGINT (Ctrl+C) signals.
ch chan os.Signal
// signals is the list of signals that cause the interrupt
signals = []os.Signal{os.Interrupt}
signals = []os.Signal{syscall.SIGINT}
// ShutdownRequestChan is a channel that can receive shutdown requests
ShutdownRequestChan = make(chan struct{})
// addHandlerChan is used to add an interrupt handler to the list of
@@ -75,17 +76,17 @@ func Listener() {
}
log.I.Ln("restarting")
if runtime.GOOS != "windows" {
e = syscall.Exec(file, os.Args, os.Environ())
e = unix.Exec(file, os.Args, os.Environ())
if e != nil {
log.I.Ln(e)
}
} else {
log.I.Ln("doing windows restart")
// procAttr := new(os.ProcAttr)
// procAttr.Files = []*os.File{os.Stdin, os.Stdout, os.Stderr}
// os.StartProcess(os.Args[0], os.Args[1:], procAttr)
var s []string
// s = []string{"cmd.exe", "/C", "start"}
s = append(s, os.Args[0])

View File

@@ -11,7 +11,15 @@ import (
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
const ErrEmptyBytes = "cannot encode empty bytes"
const (
ErrEmptyBytes = "cannot encode empty bytes"
ErrDupe = "found duplicate packet, no redundancy, decoding failed"
ErrLostNoRedundant = "no redundancy with %d lost of %d"
ErrMismatch = "found disagreement about common data in segment %d of %d" +
" in field %s value: got %v expected %v"
ErrNotEnough = "too many lost to recover in section %d, have %d, need " +
"%d minimum"
)
// Split creates a series of packets including the defined Reed Solomon
// parameters for extra parity shards and the return encryption public key for a
@@ -41,13 +49,6 @@ func Split(pp Params, segSize int) (packets [][]byte, e error) {
return
}
const ErrDupe = "found duplicate packet, no redundancy, decoding failed"
const ErrLostNoRedundant = "no redundancy with %d lost of %d"
const ErrMismatch = "found disagreement about common data in segment %d of %d" +
" in field %s value: got %v expected %v"
const ErrNotEnough = "too many lost to recover in section %d, have %d, need " +
"%d minimum"
// Join a collection of Packets together.
func Join(packets Packets) (msg []byte, e error) {
if len(packets) == 0 {

View File

@@ -44,11 +44,14 @@ func TestSplitJoin(t *testing.T) {
var pkt *Packet
var from *pub.Key
var to cloak.PubKey
_ = to
if from, to, e = GetKeys(splitted[i]); check(e) {
log.I.Ln(i)
continue
}
if !cloak.Match(to, rP.ToBytes()) {
t.Error("failed to match cloak")
t.FailNow()
}
if pkt, e = Decode(splitted[i], from, rp); check(e) {
t.Error(e)
}

113
pkg/rudp/README.md Normal file
View File

@@ -0,0 +1,113 @@
[![Build Status](https://travis-ci.org/u35s/rudp.svg?branch=master)](https://travis-ci.org/u35s/rudp)
[![Coverage Status](https://coveralls.io/repos/github/u35s/rudp/badge.svg)](https://coveralls.io/github/u35s/rudp)
# rudp
rudp采用请求回应机制,实现了UDP的可靠传输,即接收方检查是否丢失数据,然后向发送方请求丢失的数据,因此发送方必须保留已经发送过的数据一定时间来回应数据丢失。为了减小发送方数据保留量,在每收到n个包时通知发送方n之前的包已经收到可以清除了,另外超过设定的包超时时间后也会清除。
# 使用
1 创建rudp对象
```golang
rudp := rudp.New()
```
2 发送消息,n 发送的的消息长度,err 是否出错
```golang
n ,err := rudp.Send(bts []byte)
```
3 接受消息,n 返回接受到的的消息长度,err 是否出错
```golang
n , err := rudp.Recv(data []byte)
```
4 更新时间获取要发送的消息,如果设置的sendDelay大于更新tick,update返回nil,下次调用时间到时会返回所有的消息链表
```golang
var package *Package = rudp.Update(tick int)
```
5 相关设置
```golang
rudp.SetCorruptTick(n int) //设置超过n个tick连接丢失
rudp.SetExpiredTick(n int) //设置发送的消息最大保留n个tick
rudp.SetSendDelayTick(n int) //设置n个tick发送一次消息包
rudp.SetMissingTime(n int) //设置n纳秒没有收到消息包就认为消息丢失请求重发
```
# 兼容tcp
另外rudp也实现了tcp的相关接口,很容易改造现有的tcp项目为rudp
### 服务端
1 监听udp端口
```golang
addr := &net.UDPAddr{IP: net.ParseIP("0.0.0.0"), Port: 9981}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Println(err)
return
}
```
2 接受连接
```golang
listener := rudp.NewListener(conn)
rconn, err := listener.AcceptRudp()
if err != nil {
fmt.Printf("accept err %v\n", err)
return
}
```
3 读取消息
```golang
data := make([]byte, rudp.MAX_PACKAGE)
n, err := rconn.Read(data)
if err != nil {
fmt.Printf("read err %s\n", err)
return
}
```
4 发送消息
```golang
n , err := rconn.Write([]byte("hello rudp"))
```
### 客户端
1 拨号
```golang
raddr := net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9981}
//raddr := net.UDPAddr{IP: net.ParseIP("47.89.180.105"), Port: 9981}
laddr := net.UDPAddr{IP: net.IPv4zero, Port: 0}
conn, err := net.DialUDP("udp", &laddr, &raddr)
if err != nil {
fmt.Println(err)
return
}
```
2 创建conn
```golang
rconn := rudp.NewConn(conn, rudp.New())
```
3 发送消息,同服务端
4 接受消息,同服务端
### 相关设置
```golang
rudp.SetAtuoSend(bool) 设置rudp是否自动发送消息
rudp.SetSendTick() 设置发送的间隔(为0时自动发送消息不启用)
rudp.SetMaxSendNumPerTick() 设置每个tick可以最大发送的消息数量
```
# Links
1. https://github.com/cloudwu/rudp --rudp in c
2. https://blog.codingnow.com/2016/03/reliable_udp.html --blog of rudp

25
pkg/rudp/conf.go Normal file
View File

@@ -0,0 +1,25 @@
package rudp
import "time"
//rudp
var corruptTick int = 5
var expiredTick int = 1e2 * 60 * 5 //5 minute on sendTick 1e7
var sendDelayTick int = 1
var missingTime int = 1e7
func SetCorruptTick(tick int) { corruptTick = tick }
func SetExpiredTick(tick int) { expiredTick = tick }
func SetSendDelayTick(tick int) { sendDelayTick = tick }
func SetMissingTime(miss int) { missingTime = miss }
//rudp conn
var debug bool = false
var autoSend bool = true
var sendTick time.Duration = 1e7
var maxSendNumPerTick int = 500
func SetDebug(d bool) { debug = d }
func SetAtuoSend(send bool) { autoSend = send }
func SetSendTick(tick time.Duration) { sendTick = tick }
func SetMaxSendNumPerTick(n int) { maxSendNumPerTick = n }

209
pkg/rudp/conn.go Normal file
View File

@@ -0,0 +1,209 @@
package rudp
import (
"net"
"time"
)
func NewConn(conn *net.UDPConn, rudp *Rudp) *RudpConn {
con := &RudpConn{conn: conn, rudp: rudp,
recvChan: make(chan []byte, 1<<16), recvErr: make(chan error, 2),
sendChan: make(chan []byte, 1<<16), sendErr: make(chan error, 2),
SendTick: make(chan int, 2),
}
go con.run()
return con
}
func NewUnConn(conn *net.UDPConn, remoteAddr *net.UDPAddr, rudp *Rudp, close func(string)) *RudpConn {
con := &RudpConn{conn: conn, rudp: rudp, SendTick: make(chan int, 2),
recvChan: make(chan []byte, 1<<16), recvErr: make(chan error, 2),
sendChan: make(chan []byte, 1<<16), sendErr: make(chan error, 2),
closef: close, remoteAddr: remoteAddr, in: make(chan []byte, 1<<16),
}
go con.run()
return con
}
type RudpConn struct {
conn *net.UDPConn
rudp *Rudp
recvChan chan []byte
recvErr chan error
sendChan chan []byte
sendErr chan error
SendTick chan int
//unconected
remoteAddr *net.UDPAddr
closef func(addr string)
in chan []byte
}
func (rc *RudpConn) SetDeadline(t time.Time) error { return nil }
func (rc *RudpConn) SetReadDeadline(t time.Time) error { return nil }
func (rc *RudpConn) SetWriteDeadline(t time.Time) error { return nil }
func (rc *RudpConn) LocalAddr() net.Addr { return rc.conn.LocalAddr() }
func (rc *RudpConn) Connected() bool { return rc.remoteAddr == nil }
func (rc *RudpConn) RemoteAddr() net.Addr {
if rc.remoteAddr != nil {
return rc.remoteAddr
}
return rc.conn.RemoteAddr()
}
func (rc *RudpConn) Close() error {
var err error
if rc.remoteAddr != nil {
if rc.closef != nil {
rc.closef(rc.remoteAddr.String())
}
_, err = rc.conn.WriteToUDP([]byte{TYPE_CORRUPT}, rc.remoteAddr)
rc.in <- []byte{TYPE_EOF}
} else {
_, err = rc.conn.Write([]byte{TYPE_CORRUPT})
}
checkErr(err)
return err
}
func (rc *RudpConn) Read(bts []byte) (n int, err error) {
select {
case data := <-rc.recvChan:
copy(bts, data)
return len(data), nil
case err := <-rc.recvErr:
return 0, err
}
}
func (rc *RudpConn) send(bts []byte) (err error) {
select {
case rc.sendChan <- bts:
return nil
case err := <-rc.sendErr:
return err
}
}
func (rc *RudpConn) Write(bts []byte) (n int, err error) {
sz := len(bts)
for len(bts)+MAX_MSG_HEAD > GENERAL_PACKAGE {
if err := rc.send(bts[:GENERAL_PACKAGE-MAX_MSG_HEAD]); err != nil {
return 0, err
}
bts = bts[GENERAL_PACKAGE-MAX_MSG_HEAD:]
}
return sz, rc.send(bts)
}
func (rc *RudpConn) rudpRecv(data []byte) error {
for {
n, err := rc.rudp.Recv(data)
if err != nil {
rc.recvErr <- err
return err
} else if n == 0 {
break
}
bts := make([]byte, n)
copy(bts, data[:n])
rc.recvChan <- bts
}
return nil
}
func (rc *RudpConn) conectedRecvLoop() {
data := make([]byte, MAX_PACKAGE)
for {
n, err := rc.conn.Read(data)
if err != nil {
rc.recvErr <- err
return
}
rc.rudp.Input(data[:n])
if rc.rudpRecv(data) != nil {
return
}
}
}
func (rc *RudpConn) unconectedRecvLoop() {
data := make([]byte, MAX_PACKAGE)
for {
select {
case bts := <-rc.in:
rc.rudp.Input(bts)
if rc.rudpRecv(data) != nil {
return
}
}
}
}
func (rc *RudpConn) sendLoop() {
var sendNum int
for {
select {
case tick := <-rc.SendTick:
sendOut:
for {
select {
case bts := <-rc.sendChan:
_, err := rc.rudp.Send(bts)
if err != nil {
rc.sendErr <- err
return
}
sendNum++
if sendNum >= maxSendNumPerTick {
break sendOut
}
default:
break sendOut
}
}
sendNum = 0
p := rc.rudp.Update(tick)
var num, sz int
for p != nil {
n, err := int(0), error(nil)
if rc.Connected() {
n, err = rc.conn.Write(p.Bts)
} else {
n, err = rc.conn.WriteToUDP(p.Bts, rc.remoteAddr)
}
if err != nil {
rc.sendErr <- err
return
}
sz, num = sz+n, num+1
p = p.Next
}
if num > 1 {
show := bitShow(sz * int(time.Second/sendTick))
dbg("send package num %v,sz %v, %v/s,local %v,remote %v",
num, show, show, rc.LocalAddr(), rc.RemoteAddr())
}
}
}
}
func (rc *RudpConn) run() {
if autoSend && sendTick > 0 {
go func() {
tick := time.Tick(sendTick)
for {
select {
case <-tick:
rc.SendTick <- 1
}
}
}()
}
go func() {
if rc.Connected() {
rc.conectedRecvLoop()
} else {
rc.unconectedRecvLoop()
}
}()
rc.sendLoop()
}

View File

@@ -0,0 +1,43 @@
package main
import (
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
"git-indra.lan/indra-labs/rudp"
)
func main() {
raddr := net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9981}
// raddr := net.UDPAddr{IP: net.ParseIP("47.89.180.105"), Port: 9981}
laddr := net.UDPAddr{IP: net.IPv4zero, Port: 0}
conn, err := net.DialUDP("udp", &laddr, &raddr)
if err != nil {
fmt.Println(err)
return
}
rconn := rudp.NewConn(conn, rudp.New())
defer func() { fmt.Println("defer close", rconn.Close()) }()
go func() {
bts := make([]byte, 1)
for i := uint8(0); ; i++ {
bts[0] = byte(i)
_, err := rconn.Write(bts)
if err != nil {
fmt.Printf("write err %v\n", err)
os.Exit(1)
break
}
time.Sleep(1e9)
}
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT)
select {
case <-signalChan:
}
}

View File

@@ -0,0 +1,92 @@
package main
import (
"fmt"
"git-indra.lan/indra-labs/rudp"
)
var dumpIdx int
func dumpRecv(U *rudp.Rudp) {
bts := make([]byte, rudp.MAX_PACKAGE)
for {
n, err := U.Recv(bts)
if err != nil {
fmt.Println(err)
break
} else if n == 0 {
break
}
fmt.Printf("RECV ")
for i := 0; i < n; i++ {
fmt.Printf("%02x ", bts[i])
}
fmt.Println()
}
}
func dump(p *rudp.Package) {
fmt.Printf("%d : ", dumpIdx)
dumpIdx++
for p != nil {
fmt.Printf("(")
for i := range p.Bts {
fmt.Printf("%02x ", p.Bts[i])
}
fmt.Printf(") ")
p = p.Next
}
fmt.Println()
}
func main() {
// fmt.Println("vim-go")
udp := rudp.New()
t1 := []byte{1, 2, 3, 4}
t2 := []byte{5, 6, 7, 8}
t3 := []byte{
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 10, 11, 12, 13,
}
t4 := []byte{4, 3, 2, 1}
udp.Send(t1)
udp.Send(t2)
dump(udp.Update(1)) // dump0
dump(udp.Update(1)) // dump1
udp.Send(t3)
udp.Send(t4)
dump(udp.Update(1)) // dump2
r1 := []byte{rudp.TYPE_REQUEST, 00, 00, 00, 00, rudp.TYPE_REQUEST, 00, 03, 00, 03}
udp.Input(r1)
dump(udp.Update(1)) // dump3
dumpRecv(udp)
r2 := []byte{rudp.TYPE_NORMAL + 1, 0, 1, 1,
rudp.TYPE_NORMAL + 1, 0, 3, 3}
udp.Input(r2)
dump(udp.Update(1)) // dump4
dumpRecv(udp)
r3 := []byte{rudp.TYPE_NORMAL + 1, 0, 0, 0,
rudp.TYPE_NORMAL + 1, 0, 5, 5}
udp.Input(r3)
r4 := []byte{rudp.TYPE_NORMAL + 1, 0, 2, 2}
udp.Input(r4)
dump(udp.Update(1)) // dump5
dumpRecv(udp)
}

View File

@@ -0,0 +1,56 @@
package main
import (
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"git-indra.lan/indra-labs/rudp"
)
func read(conn *rudp.RudpConn) {
for {
data := make([]byte, rudp.MAX_PACKAGE)
n, err := conn.Read(data)
if err != nil {
fmt.Printf("read err %s\n", err)
break
}
fmt.Printf("receive ")
for i := range data[:n] {
v := int(data[i])
fmt.Printf("%d", v)
}
fmt.Printf(" from <%v>\n", conn.RemoteAddr())
}
}
func main() {
log.SetOutput(os.Stdout)
addr := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9981}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Println(err)
return
}
listener := rudp.NewListener(conn)
defer func() { fmt.Println("defer close", listener.Close()) }()
go func() {
for {
rconn, err := listener.AcceptRudp()
if err != nil {
fmt.Printf("accept err %v\n", err)
break
}
go read(rconn)
}
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT)
select {
case <-signalChan:
}
}

3
pkg/rudp/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module git-indra.lan/indra-labs/rudp
go 1.19

2
pkg/rudp/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/u35s/rudp v0.0.0-20190524081740-bcc26b6b3828 h1:Y9BPl7MuQm8GMA+zqOvWv/JTuJkMyA7ol+QHLtuH330=
github.com/u35s/rudp v0.0.0-20190524081740-bcc26b6b3828/go.mod h1:RoS9SX0MyvunzHkI0fntze/G3apwbDXz12uvz48QA1s=

79
pkg/rudp/listener.go Normal file
View File

@@ -0,0 +1,79 @@
package rudp
import (
"net"
"sync"
)
func NewListener(conn *net.UDPConn) *RudpListener {
listen := &RudpListener{conn: conn,
newRudpConn: make(chan *RudpConn, 1024),
newRudpErr: make(chan error, 12),
rudpConnMap: make(map[string]*RudpConn)}
go listen.run()
return listen
}
type RudpListener struct {
conn *net.UDPConn
lock sync.RWMutex
newRudpConn chan *RudpConn
newRudpErr chan error
rudpConnMap map[string]*RudpConn
}
//net listener interface
func (this *RudpListener) Accept() (net.Conn, error) { return this.AcceptRudp() }
func (this *RudpListener) Close() error {
this.CloseAllRudp()
return this.conn.Close()
}
func (this *RudpListener) Addr() net.Addr { return this.conn.LocalAddr() }
func (this *RudpListener) CloseRudp(addr string) {
this.lock.Lock()
delete(this.rudpConnMap, addr)
this.lock.Unlock()
}
func (this *RudpListener) CloseAllRudp() {
this.lock.Lock()
for _, rconn := range this.rudpConnMap {
rconn.closef = nil
rconn.Close()
}
this.lock.Unlock()
}
func (this *RudpListener) AcceptRudp() (*RudpConn, error) {
select {
case c := <-this.newRudpConn:
return c, nil
case e := <-this.newRudpErr:
return nil, e
}
}
func (this *RudpListener) run() {
data := make([]byte, MAX_PACKAGE)
for {
n, remoteAddr, err := this.conn.ReadFromUDP(data)
if err != nil {
this.CloseAllRudp()
this.newRudpErr <- err
return
}
this.lock.RLock()
rudpConn, ok := this.rudpConnMap[remoteAddr.String()]
this.lock.RUnlock()
if !ok {
rudpConn = NewUnConn(this.conn, remoteAddr, New(), this.CloseRudp)
this.lock.Lock()
this.rudpConnMap[remoteAddr.String()] = rudpConn
this.lock.Unlock()
this.newRudpConn <- rudpConn
}
bts := make([]byte, n)
copy(bts, data[:n])
rudpConn.in <- bts
}
}

457
pkg/rudp/rudp.go Normal file
View File

@@ -0,0 +1,457 @@
package rudp
import (
"bytes"
"errors"
"sync/atomic"
"time"
)
const (
TYPE_PING = iota
TYPE_EOF
TYPE_CORRUPT
TYPE_REQUEST
TYPE_MISSING
TYPE_NORMAL
)
const (
MAX_MSG_HEAD = 4
GENERAL_PACKAGE = 576 - 60 - 8
MAX_PACKAGE = 0x7fff - TYPE_NORMAL
)
const (
ERROR_NIL int32 = iota
ERROR_EOF
ERROR_REMOTE_EOF
ERROR_CORRUPT
ERROR_MSG_SIZE
)
type Error struct {
v int32
}
func (e *Error) Load() int32 { return atomic.LoadInt32(&e.v) }
func (e *Error) Store(n int32) { atomic.StoreInt32(&e.v, n) }
func (e *Error) Error() error {
switch e.Load() {
case ERROR_EOF:
return errors.New("EOF")
case ERROR_REMOTE_EOF:
return errors.New("remote EOF")
case ERROR_CORRUPT:
return errors.New("corrupt")
case ERROR_MSG_SIZE:
return errors.New("recive msg size error")
default:
return nil
}
}
type Package struct {
Next *Package
Bts []byte
}
type packageBuffer struct {
tmp bytes.Buffer
num int
head *Package
tail *Package
}
func (tmp *packageBuffer) packRequest(min, max int, tag int) {
if tmp.tmp.Len()+5 > GENERAL_PACKAGE {
tmp.newPackage()
}
tmp.tmp.WriteByte(byte(tag))
tmp.tmp.WriteByte(byte((min & 0xff00) >> 8))
tmp.tmp.WriteByte(byte(min & 0xff))
tmp.tmp.WriteByte(byte((max & 0xff00) >> 8))
tmp.tmp.WriteByte(byte(max & 0xff))
}
func (tmp *packageBuffer) fillHeader(head, id int) {
if head < 128 {
tmp.tmp.WriteByte(byte(head))
} else {
tmp.tmp.WriteByte(byte(((head & 0x7f00) >> 8) | 0x80))
tmp.tmp.WriteByte(byte(head & 0xff))
}
tmp.tmp.WriteByte(byte((id & 0xff00) >> 8))
tmp.tmp.WriteByte(byte(id & 0xff))
}
func (tmp *packageBuffer) packMessage(m *message) {
if m.buf.Len()+4+tmp.tmp.Len() >= GENERAL_PACKAGE {
tmp.newPackage()
}
tmp.fillHeader(m.buf.Len()+TYPE_NORMAL, m.id)
tmp.tmp.Write(m.buf.Bytes())
}
func (tmp *packageBuffer) newPackage() {
if tmp.tmp.Len() <= 0 {
return
}
p := &Package{Bts: make([]byte, tmp.tmp.Len())}
copy(p.Bts, tmp.tmp.Bytes())
tmp.tmp.Reset()
tmp.num++
if tmp.tail == nil {
tmp.head = p
tmp.tail = p
} else {
tmp.tail.Next = p
tmp.tail = p
}
}
func New() *Rudp {
return &Rudp{reqSendAgain: make(chan [2]int, 1<<10), addSendAgain: make(chan [2]int, 1<<10), recvSkip: make(map[int]int)}
}
type Rudp struct {
recvQueue messageQueue
recvSkip map[int]int
reqSendAgain chan [2]int
recvIDMin int
recvIDMax int
sendQueue messageQueue
sendHistory messageQueue
addSendAgain chan [2]int
sendID int
corrupt Error
currentTick int
lastRecvTick int
lastExpiredTick int
lastSendDelayTick int
}
func (r *Rudp) Recv(bts []byte) (int, error) {
if err := r.corrupt.Load(); err != ERROR_NIL {
return 0, r.corrupt.Error()
}
m := r.recvQueue.pop(r.recvIDMin)
if m == nil {
return 0, nil
}
r.recvIDMin++
copy(bts, m.buf.Bytes())
return m.buf.Len(), nil
}
func (r *Rudp) Send(bts []byte) (n int, err error) {
if err := r.corrupt.Load(); err != ERROR_NIL {
return 0, r.corrupt.Error()
}
if len(bts) > MAX_PACKAGE {
return 0, nil
}
m := &message{}
m.buf.Write(bts)
m.id = r.sendID
r.sendID++
m.tick = r.currentTick
r.sendQueue.push(m)
return len(bts), nil
}
func (r *Rudp) Update(tick int) *Package {
if r.corrupt.Load() != ERROR_NIL {
return nil
}
r.currentTick += tick
if r.currentTick >= r.lastExpiredTick+expiredTick {
r.lastExpiredTick = r.currentTick
r.clearSendExpired()
}
if r.currentTick >= r.lastRecvTick+corruptTick {
r.corrupt.Store(ERROR_CORRUPT)
}
if r.currentTick >= r.lastSendDelayTick+sendDelayTick {
r.lastSendDelayTick = r.currentTick
return r.outPut()
}
return nil
}
type message struct {
next *message
buf bytes.Buffer
id int
tick int
}
type messageQueue struct {
head *message
tail *message
num int
}
func (r *messageQueue) pop(id int) *message {
if r.head == nil {
return nil
}
m := r.head
if id >= 0 && m.id != id {
return nil
}
r.head = m.next
m.next = nil
if r.head == nil {
r.tail = nil
}
r.num--
return m
}
func (r *messageQueue) push(m *message) {
if r.tail == nil {
r.head = m
r.tail = m
} else {
r.tail.next = m
r.tail = m
}
r.num++
}
func (r *Rudp) getID(max int, bt1, bt2 byte) int {
n1, n2 := int(bt1), int(bt2)
id := n1*256 + n2
id |= max & ^0xffff
if id < max-0x8000 {
id += 0x10000
dbg("id < max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
n1*256+n2, id, r.recvIDMin, max, id+0x10000)
} else if id > max+0x8000 {
id -= 0x10000
dbg("id > max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
n1*256+n2, id, r.recvIDMin, max, id+0x10000)
}
return id
}
func (r *Rudp) outPut() *Package {
var tmp packageBuffer
r.reqMissing(&tmp)
r.replyRequest(&tmp)
r.sendMessage(&tmp)
if tmp.head == nil && tmp.tmp.Len() == 0 {
tmp.tmp.WriteByte(byte(TYPE_PING))
}
tmp.newPackage()
return tmp.head
}
func (r *Rudp) Input(bts []byte) {
sz := len(bts)
if sz > 0 {
r.lastRecvTick = r.currentTick
}
for sz > 0 {
len := int(bts[0])
if len > 127 {
if sz <= 1 {
r.corrupt.Store(ERROR_MSG_SIZE)
return
}
len = (len*256 + int(bts[1])) & 0x7fff
bts = bts[2:]
sz -= 2
} else {
bts = bts[1:]
sz -= 1
}
switch len {
case TYPE_PING:
r.checkMissing(false)
case TYPE_EOF:
r.corrupt.Store(ERROR_EOF)
case TYPE_CORRUPT:
r.corrupt.Store(ERROR_REMOTE_EOF)
return
case TYPE_REQUEST, TYPE_MISSING:
if sz < 4 {
r.corrupt.Store(ERROR_MSG_SIZE)
return
}
exe := r.addRequest
max := r.sendID
if len == TYPE_MISSING {
exe = r.addMissing
max = r.recvIDMax
}
exe(r.getID(max, bts[0], bts[1]), r.getID(max, bts[2], bts[3]))
bts = bts[4:]
sz -= 4
default:
len -= TYPE_NORMAL
if sz < len+2 {
r.corrupt.Store(ERROR_MSG_SIZE)
return
}
r.insertMessage(r.getID(r.recvIDMax, bts[0], bts[1]), bts[2:len+2])
bts = bts[len+2:]
sz -= len + 2
}
}
r.checkMissing(false)
}
func (r *Rudp) checkMissing(direct bool) {
head := r.recvQueue.head
if head != nil && head.id > r.recvIDMin {
nano := int(time.Now().UnixNano())
last := r.recvSkip[r.recvIDMin]
if !direct && last == 0 {
r.recvSkip[r.recvIDMin] = nano
dbg("miss start %v-%v,max %v", r.recvIDMin, head.id-1, r.recvIDMax)
} else if direct || last+missingTime < nano {
delete(r.recvSkip, r.recvIDMin)
r.reqSendAgain <- [2]int{r.recvIDMin, head.id - 1}
dbg("req miss %v-%v,direct %v,wait num %v",
r.recvIDMin, head.id-1, direct, r.recvQueue.num)
}
}
}
func (r *Rudp) insertMessage(id int, bts []byte) {
if id < r.recvIDMin {
dbg("already recv %v,len %v", id, len(bts))
return
}
delete(r.recvSkip, id)
if id > r.recvIDMax || r.recvQueue.head == nil {
m := &message{}
m.buf.Write(bts)
m.id = id
r.recvQueue.push(m)
r.recvIDMax = id
} else {
m := r.recvQueue.head
last := &r.recvQueue.head
for m != nil {
if m.id == id {
dbg("repeat recv id %v,len %v", id, len(bts))
} else if m.id > id {
tmp := &message{}
tmp.buf.Write(bts)
tmp.id = id
tmp.next = m
*last = tmp
r.recvQueue.num++
return
}
last = &m.next
m = m.next
}
}
}
func (r *Rudp) sendMessage(tmp *packageBuffer) {
m := r.sendQueue.head
for m != nil {
tmp.packMessage(m)
m = m.next
}
if r.sendQueue.head != nil {
if r.sendHistory.tail == nil {
r.sendHistory = r.sendQueue
} else {
r.sendHistory.tail.next = r.sendQueue.head
r.sendHistory.tail = r.sendQueue.tail
}
r.sendQueue.head = nil
r.sendQueue.tail = nil
}
}
func (r *Rudp) clearSendExpired() {
m := r.sendHistory.head
for m != nil {
if m.tick >= r.lastExpiredTick {
break
}
m = m.next
}
r.sendHistory.head = m
if m == nil {
r.sendHistory.tail = nil
}
}
func (r *Rudp) addRequest(min, max int) {
dbg("add request %v-%v,max send id %v", min, max, r.sendID)
r.addSendAgain <- [2]int{min, max}
}
func (r *Rudp) addMissing(min, max int) {
if max < r.recvIDMin {
dbg("add missing %v-%v fail,already recv,min %v", min, max, r.recvIDMin)
return
}
if min > r.recvIDMin {
dbg("add missing %v-%v fail, more than min %v", min, max, r.recvIDMin)
return
}
head := 0
if r.recvQueue.head != nil {
head = r.recvQueue.head.id
}
dbg("add missing %v-%v,min %v,head %v", min, max, r.recvIDMin, head)
r.recvIDMin = max + 1
r.checkMissing(true)
}
func (r *Rudp) replyRequest(tmp *packageBuffer) {
for {
select {
case again := <-r.addSendAgain:
history := r.sendHistory.head
min, max := again[0], again[1]
if history == nil || max < history.id {
dbg("send again miss %v-%v,send max %v", min, max, r.sendID)
tmp.packRequest(min, max, TYPE_MISSING)
} else {
var start, end, num int
for {
if history == nil || max < history.id {
//expired
break
} else if min <= history.id {
tmp.packMessage(history)
if start == 0 {
start = history.id
}
end = history.id
num++
}
history = history.next
}
if min < start {
tmp.packRequest(min, start-1, TYPE_MISSING)
dbg("send again miss %v-%v,send max %v", min, start-1, r.sendID)
}
dbg("send again %v-%v of %v-%v,all %v,max send id %v", start, end, min, max, num, r.sendID)
}
default:
return
}
}
}
func (r *Rudp) reqMissing(tmp *packageBuffer) {
for {
select {
case req := <-r.reqSendAgain:
tmp.packRequest(req[0], req[1], TYPE_REQUEST)
default:
return
}
}
}

146
pkg/rudp/rudp_test.go Normal file
View File

@@ -0,0 +1,146 @@
package rudp
import (
"net"
"testing"
)
func Test_bitShow(t *testing.T) {
if show := bitShow(1023); show != "1023 b" {
t.Errorf("byte show error %v,show %v", 1023, show)
}
if show := bitShow(1025); show != "1 Kb" {
t.Errorf("byte show error %v,show %v", 1025, show)
}
if show := bitShow(1024*1024 + 1); show != "1 Mb" {
t.Errorf("byte show error %v,show %v", 1024*1024+1, show)
}
}
func Test_Rudp(t *testing.T) {
udp := New()
t1 := []byte{1, 2, 3, 4}
t2 := []byte{5, 6, 7, 8}
t3 := []byte{
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 1, 1, 1, 3,
2, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 1, 10, 11, 12, 13,
}
t4 := []byte{4, 3, 2, 1}
send := func(bts []byte) {
n, err := udp.Send(bts)
if err != nil {
t.Error(err)
}
if n != len(bts) {
t.Errorf("send length error,send %v,realy %v", n, len(bts))
}
}
send(t1)
send(t2)
if udp.Update(0) != nil {
t.Errorf("update 0 return error")
}
pkg := udp.Update(sendDelayTick)
sendLen := func(p *Package) (sz int) {
for p != nil {
sz += len(p.Bts)
p = p.Next
}
return
}
if sendLen(pkg) != len(t1)+3+len(t2)+3 {
t.Errorf("out pkg t1,t2 length error,out %v,realy %v",
sendLen(pkg), len(t1)+3+len(t2)+3)
}
pkg = udp.Update(sendDelayTick)
if pkg == nil || len(pkg.Bts) != 1 {
t.Errorf("ping error,pkg %v", pkg)
}
send(t3)
send(t4)
pkg = udp.Update(sendDelayTick)
if sendLen(pkg) != len(t3)+4+len(t4)+3 {
t.Errorf("out pkg t3,t4 length error,out %v,realy %v",
sendLen(pkg), len(t3)+4+len(t4)+3)
}
r1 := []byte{TYPE_REQUEST, 00, 00, 00, 00, TYPE_REQUEST, 00, 03, 00, 03}
udp.Input(r1)
pkg = udp.Update(sendDelayTick)
if sendLen(pkg) != len(t1)+3+len(t4)+3 {
t.Errorf("miss out pkg t1,t4 length error,out %v,realy %v",
sendLen(pkg), len(t1)+3+len(t4)+3)
}
}
func Test_RudpConn(t *testing.T) {
addr := &net.UDPAddr{IP: net.ParseIP("0.0.0.0"), Port: 9981}
sconn, err := net.ListenUDP("udp", addr)
if err != nil {
return
}
listener := NewListener(sconn)
var send = []byte{'h', 'e', 'l', 'l', 'o'}
go func() {
rconn, err := listener.AcceptRudp()
if err != nil {
t.Error(err)
return
}
data := make([]byte, MAX_PACKAGE)
n, err := rconn.Read(data)
if err != nil {
t.Error(err)
return
}
if string(data[:n]) != string(send) {
t.Error(err)
return
}
rconn.Write(send)
if err := rconn.Close(); err != nil {
t.Error(err)
}
}()
raddr := net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9981}
laddr := net.UDPAddr{IP: net.IPv4zero, Port: 0}
cconn, err := net.DialUDP("udp", &laddr, &raddr)
if err != nil {
t.Error(err)
return
}
rconn := NewConn(cconn, New())
rconn.Write(send)
data := make([]byte, MAX_PACKAGE)
n, err := rconn.Read(data)
if err != nil {
return
}
if string(data[:n]) != string(send) {
t.Error(err)
return
}
if err := rconn.Close(); err != nil {
t.Error(err)
}
if err := listener.Close(); err != nil {
t.Error(err)
}
}

31
pkg/rudp/util.go Normal file
View File

@@ -0,0 +1,31 @@
package rudp
import (
"fmt"
"log"
)
func dbg(format string, v ...interface{}) {
if debug {
log.Printf(format, v...)
}
}
func checkErr(err error) {
if err != nil {
log.Printf("%v", err)
}
}
func bitShow(n int) string {
var ext string = "b"
if n >= 1024 {
n /= 1024
ext = "Kb"
}
if n >= 1024 {
n /= 1024
ext = "Mb"
}
return fmt.Sprintf("%v %v", n, ext)
}