Skip to content

Paxos quorum read #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (b *Benchmark) Run() {
close(keys)
stat := Statistic(b.latency)

log.Infof("Concurrency = %d", b.Concurrency)
log.Infof("Write Ratio = %f", b.W)
log.Infof("Number of Keys = %d", b.K)
log.Infof("Benchmark took %v\n", t)
log.Infof("Throughput %f\n", float64(len(b.latency))/t.Seconds())
log.Info(stat)
Expand Down
4 changes: 4 additions & 0 deletions checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sort"

"github.com/ailidani/paxi/lib"
"github.com/ailidani/paxi/log"
)

// A simple linearizability checker based on https://pdos.csail.mit.edu/6.824/papers/fb-consistency.pdf
Expand Down Expand Up @@ -90,6 +91,9 @@ func (c *checker) linearizable(history []*operation) []*operation {
cycle := c.Graph.Cycle()
if cycle != nil {
anomaly = append(anomaly, o)
//bval := make([]byte, 0)
//binary.BigEndian.PutUint64(bval, anomal)
log.Debugf("Anomaly: %v", anomaly)
for _, u := range cycle {
for _, v := range cycle {
if c.Graph.From(u).Has(v) && u.(*operation).start > v.(*operation).end {
Expand Down
174 changes: 169 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@ import (

"github.com/ailidani/paxi/lib"
"github.com/ailidani/paxi/log"
"encoding/binary"
)

const NO_BARRIER_SLOT = -1

type pqrTuple struct{
val Value
slot int
ID ID
}

// Client interface provides get and put for key value store
type Client interface {
Get(Key) (Value, error)
Expand All @@ -37,18 +46,21 @@ type HTTPClient struct {
N int // total number of nodes
LocalN int // number of nodes in local zone

PQR bool

cid int // command id
*http.Client
}

// NewHTTPClient creates a new Client from config
func NewHTTPClient(id ID) *HTTPClient {
func NewHTTPClient(id ID, paxosQuorumRead bool) *HTTPClient {
c := &HTTPClient{
ID: id,
N: len(config.Addrs),
Addrs: config.Addrs,
HTTP: config.HTTPAddrs,
Client: &http.Client{},
PQR: paxosQuorumRead,
}
if id != "" {
i := 0
Expand All @@ -74,6 +86,55 @@ func (c *HTTPClient) GetURL(id ID, key Key) string {
return c.HTTP[id] + "/" + strconv.Itoa(int(key))
}

func (c *HTTPClient) restPaxosQuorumRead(id ID, key Key, barrierSlot int) (Value, int, error) {
url := c.HTTP[id] + "/pqr/" + strconv.Itoa(int(key))

method := http.MethodGet
var body io.Reader
r, err := http.NewRequest(method, url, body)
if err != nil {
log.Error(err)
return nil, NO_TIMESTAMP, err
}
r.Header.Set(HTTPClientID, string(c.ID))
r.Header.Set(HTTPCommandID, strconv.Itoa(c.cid))
r.Header.Set(HTTPSlot, strconv.Itoa(barrierSlot))

res, err := c.Client.Do(r)
if err != nil {
log.Error(err)
return nil, NO_TIMESTAMP, err
}

defer res.Body.Close()
if res.StatusCode == http.StatusOK {
strSlot := res.Header.Get(HTTPSlot)
slot, err := strconv.Atoi(strSlot)
if err != nil {
log.Error(err)
return nil, NO_TIMESTAMP, err
}
noVal := res.Header.Get(HTTPNoVal)
if noVal == "" {
b, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Error(err)
return nil, NO_TIMESTAMP, err
}
retVal := Value(b)
log.Debugf("node=%v type=%s key=%v slot=%d value=%x", id, method, key, slot, retVal)
return retVal, int(slot), nil
} else {
return nil, int(slot), nil
}

}

dump, _ := httputil.DumpResponse(res, true)
log.Debugf("%q", dump)
return nil, NO_TIMESTAMP, errors.New(res.Status)
}

// rest accesses server's REST API with url = http://ip:port/key
// if value == nil, it's a read
func (c *HTTPClient) rest(id ID, key Key, value Value) (Value, map[string]string, error) {
Expand Down Expand Up @@ -141,9 +202,13 @@ func (c *HTTPClient) RESTPut(id ID, key Key, value Value) (Value, map[string]str
// Get gets value of given key (use REST)
// Default implementation of Client interface
func (c *HTTPClient) Get(key Key) (Value, error) {
c.cid++
v, _, err := c.RESTGet(c.ID, key)
return v, err
if c.PQR {
return c.PaxosQuorumGet(key)
} else {
c.cid++
v, _, err := c.RESTGet(c.ID, key)
return v, err
}
}

// Put puts new key value pair and return previous value (use REST)
Expand Down Expand Up @@ -171,7 +236,6 @@ func (c *HTTPClient) json(id ID, key Key, value Value) (Value, error) {
defer res.Body.Close()
if res.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(res.Body)
log.Debugf("key=%v value=%x", key, Value(b))
return Value(b), nil
}
dump, _ := httputil.DumpResponse(res, true)
Expand All @@ -191,6 +255,106 @@ func (c *HTTPClient) JSONPut(key Key, value Value) (Value, error) {
return c.json(c.ID, key, value)
}


// PaxosQuorumGet concurrently read values from majority paxos nodes in 2 phases
func (c *HTTPClient) paxosQuorumGetPhase(key Key, barrierSlot int) ([]pqrTuple, error) {
c.cid++
out := make(chan pqrTuple, c.N/2 + 1)
i := 0
for id := range c.HTTP {
if i > c.N/2 {
break
}
if id == "1.1" {
continue // skipping hard-coded leader
}
i++
go func(id ID) {
log.Debugf("Sending PQR to %v\n", id)
v, slot, err := c.restPaxosQuorumRead(id, key, barrierSlot)

if err != nil {
log.Error(err)
out <- pqrTuple{nil, -1, ID("0.0")}
return
}
log.Debugf("Received response %d, %d from %v\n", v, slot, id)
out <- pqrTuple{v, slot, id}
}(id)
}
results := make([]pqrTuple, 0)
for ; i > 0; i-- {

res := <-out
log.Debugf("Collected response %v from %v, awaiting %d more responses\n", res, res.ID, i-1)
if res.slot == -1 {
return nil, errors.New("An Error has occured doing PQR")
}
results = append(results, res)
}
return results, nil
}

func (c *HTTPClient) PaxosQuorumGet(key Key) (Value, error) {
p1Results, err := c.paxosQuorumGetPhase(key, NO_BARRIER_SLOT)
if err != nil {
return nil, err
}

maxSlot := -1
valSlot := -1
var val Value
val = nil
valint := 0
for _, v := range p1Results {
if v.slot > maxSlot {
maxSlot = v.slot // v.slot here is last accepted slot at that node
}
if v.val != nil && v.slot == maxSlot {
val = v.val
valSlot = v.slot // v.slot here is last executed slot
}
}
if len(val) == 10 {
x, _ := binary.Uvarint(val)
valint = int(x)
} else {
valint = -1
}
log.Debugf("Finished PQR phase 1. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
if maxSlot == valSlot && val != nil {
log.Debugf("Returning after PQR phase 1. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
return val, nil
}

log.Debugf("Starting PQR phase 2+. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
val = nil
for val == nil {
log.Debugf("Doing PQR phase 2+. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
p2Results, err := c.paxosQuorumGetPhase(key, maxSlot)
if err != nil {
return nil, err
}
for _, v := range p2Results {
if v.slot >= maxSlot{
val = v.val
if len(val) == 10 {
x, _ := binary.Uvarint(val)
valint = int(x)
} else {
valint = -1
}
log.Debugf("PQR phase 2+ Value Change. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
break
}
}
}
log.Debugf("Returning after PQR phase 2. barrierSlot: %d, val: %d (%v)\n", maxSlot, valint, val)
return val, nil


}

// QuorumGet concurrently read values from majority nodes
func (c *HTTPClient) QuorumGet(key Key) ([]Value, []map[string]string) {
valueC := make(chan Value)
Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"encoding/binary"
"flag"

"github.com/ailidani/paxi"
)

var id = flag.String("id", "", "node id this client connects to")
var api = flag.String("api", "", "Client API type [rest, json, quorum]")
var load = flag.Bool("load", false, "Load K keys into DB")
var master = flag.String("master", "", "Master address.")
var paxosQuorumRead = flag.Bool("pqr", false, "Use Paxos Quourm Reads")

// db implements Paxi.DB interface for benchmarking
type db struct {
Expand Down Expand Up @@ -51,7 +51,7 @@ func main() {
}

d := new(db)
d.Client = paxi.NewHTTPClient(paxi.ID(*id))
d.Client = paxi.NewHTTPClient(paxi.ID(*id), *paxosQuorumRead)

b := paxi.NewBenchmark(d)
if *load {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func main() {
paxi.ConnectToMaster(*master, true, paxi.ID(*id))
}

client = paxi.NewHTTPClient(paxi.ID(*id))
client = paxi.NewHTTPClient(paxi.ID(*id), false)

if len(flag.Args()) > 0 {
run(flag.Args()[0], flag.Args()[1:])
Expand Down
9 changes: 7 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"encoding/json"
"fmt"
"sync"
"github.com/ailidani/paxi/log"
)
const NO_TIMESTAMP = -1
const NO_SLOT_NUMBER_IN_VALUE = -1

// Key of key value database
type Key int

// Value of key value database
type Value []byte
type Value []byte

// Command of key value database
type Command struct {
Expand Down Expand Up @@ -101,7 +104,7 @@ func (d *database) Execute(c interface{}) interface{} {
func (d *database) Execute(c Command) Value {
d.Lock()
defer d.Unlock()

log.Debugf("Entering Execute method (cmd: %v)", c)
// get previous value
v := d.data[c.Key]

Expand Down Expand Up @@ -135,7 +138,9 @@ func (d *database) put(k Key, v Value) {
func (d *database) Put(k Key, v Value) {
d.Lock()
defer d.Unlock()
log.Debugf("Entering Put method (k: %v, v: %v)", k, v)
d.put(k, v)

}

// Version returns current version of given key
Expand Down
Loading