Skip to content

Commit

Permalink
log: ties all the segment together
Browse files Browse the repository at this point in the history
  • Loading branch information
Brijeshlakkad committed Jun 9, 2022
1 parent 5a76591 commit ef982fb
Show file tree
Hide file tree
Showing 2 changed files with 360 additions and 0 deletions.
225 changes: 225 additions & 0 deletions internal/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// START: begin
package log

import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"

api "github.com/Brijeshlakkad/distributedlogging/api/v1"
)

type Log struct {
mu sync.RWMutex

Dir string
Config Config

activeSegment *segment
segments []*segment
}

// END: begin

// START: newlog
func NewLog(dir string, c Config) (*Log, error) {
if c.Segment.MaxStoreBytes == 0 {
c.Segment.MaxStoreBytes = 1024
}
if c.Segment.MaxIndexBytes == 0 {
c.Segment.MaxIndexBytes = 1024
}
l := &Log{
Dir: dir,
Config: c,
}

return l, l.setup()
}

// END: newlog

// START: setup
func (l *Log) setup() error {
files, err := ioutil.ReadDir(l.Dir)
if err != nil {
return err
}
var baseOffsets []uint64
for _, file := range files {
offStr := strings.TrimSuffix(
file.Name(),
path.Ext(file.Name()),
)
off, _ := strconv.ParseUint(offStr, 10, 0)
baseOffsets = append(baseOffsets, off)
}
sort.Slice(baseOffsets, func(i, j int) bool {
return baseOffsets[i] < baseOffsets[j]
})
for i := 0; i < len(baseOffsets); i++ {
if err = l.newSegment(baseOffsets[i]); err != nil {
return err
}
// baseOffset contains dup for index and store so we skip
// the dup
i++
}
if l.segments == nil {
if err = l.newSegment(l.Config.Segment.InitialOffset); err != nil {
return err
}
}
return nil
}

// END: setup

// START: append
func (l *Log) Append(record *api.Record) (uint64, error) {
l.mu.Lock()
defer l.mu.Unlock()
off, err := l.activeSegment.Append(record)
if err != nil {
return 0, err
}
if l.activeSegment.IsMaxed() {
err = l.newSegment(off + 1)
}
return off, err
}

// END: append

// START: read
func (l *Log) Read(off uint64) (*api.Record, error) {
l.mu.RLock()
defer l.mu.RUnlock()
var s *segment
for _, segment := range l.segments {
if segment.baseOffset <= off && off < segment.nextOffset {
s = segment
break
}
}
// START: before
if s == nil || s.nextOffset <= off {
return nil, fmt.Errorf("offset out of range: %d", off)
}
// END: before
return s.Read(off)
}

// END: read

// START: newsegment
func (l *Log) newSegment(off uint64) error {
s, err := newSegment(l.Dir, off, l.Config)
if err != nil {
return err
}
l.segments = append(l.segments, s)
l.activeSegment = s
return nil
}

// END: newsegment

// START: close
func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
for _, segment := range l.segments {
if err := segment.Close(); err != nil {
return err
}
}
return nil
}

func (l *Log) Remove() error {
if err := l.Close(); err != nil {
return err
}
return os.RemoveAll(l.Dir)
}

func (l *Log) Reset() error {
if err := l.Remove(); err != nil {
return err
}
return l.setup()
}

// END: close

// START: offsets
// To know what nodes have the oldest and newest data and what nodes are falling behind and need to replicate.
func (l *Log) LowestOffset() (uint64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
return l.segments[0].baseOffset, nil
}

func (l *Log) HighestOffset() (uint64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
off := l.segments[len(l.segments)-1].nextOffset
if off == 0 {
return 0, nil
}
return off - 1, nil
}

// END: offsets

// START: truncate
// (In the future, periodically) call Truncate to remove old segments whose data we (hopefully) have processed by then and don’t need anymore.
func (l *Log) Truncate(lowest uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*segment
for _, s := range l.segments {
if s.nextOffset <= lowest+1 {
if err := s.Remove(); err != nil {
return err
}
continue
}
segments = append(segments, s)
}
l.segments = segments
return nil
}

// END: truncate

// START: reader
func (l *Log) Reader() io.Reader {
l.mu.RLock()
defer l.mu.RUnlock()
readers := make([]io.Reader, len(l.segments))
for i, segment := range l.segments {
readers[i] = &originReader{segment.store, 0}
}
return io.MultiReader(readers...)
}

type originReader struct {
*store
off int64
}

func (o *originReader) Read(p []byte) (int, error) {
n, err := o.ReadAt(p, o.off)
o.off += int64(n)
return n, err
}

// END: reader
135 changes: 135 additions & 0 deletions internal/log/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// START: intro
package log

import (
"io/ioutil"
"os"
"testing"

api "github.com/Brijeshlakkad/distributedlogging/api/v1"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestLog(t *testing.T) {
for scenario, fn := range map[string]func(
t *testing.T, log *Log,
){
"append and read a record succeeds": testAppendRead,
"offset out of range error": testOutOfRangeErr,
"init with existing segments": testInitExisting,
"reader": testReader,
"truncate": testTruncate,
} {
t.Run(scenario, func(t *testing.T) {
dir, err := ioutil.TempDir("", "store-test")
require.NoError(t, err)
defer os.RemoveAll(dir)

c := Config{}
c.Segment.MaxStoreBytes = 32
log, err := NewLog(dir, c)
require.NoError(t, err)

fn(t, log)
})
}
}

// END: intro

// START: append_read
func testAppendRead(t *testing.T, log *Log) {
append := &api.Record{
Value: []byte("hello world"),
}
off, err := log.Append(append)
require.NoError(t, err)
require.Equal(t, uint64(0), off)

read, err := log.Read(off)
require.NoError(t, err)
require.Equal(t, append.Value, read.Value)
}

// END: append_read

// START: out_of_range
func testOutOfRangeErr(t *testing.T, log *Log) {
read, err := log.Read(1)
require.Nil(t, read)
require.Error(t, err)
}

// END: out_of_range

// START: init_existing
func testInitExisting(t *testing.T, o *Log) {
append := &api.Record{
Value: []byte("hello world"),
}
for i := 0; i < 3; i++ {
_, err := o.Append(append)
require.NoError(t, err)
}
require.NoError(t, o.Close())

off, err := o.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), off)
off, err = o.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), off)

n, err := NewLog(o.Dir, o.Config)
require.NoError(t, err)

off, err = n.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), off)
off, err = n.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), off)
}

// END: init_existing

// START: reader
func testReader(t *testing.T, log *Log) {
append := &api.Record{
Value: []byte("hello world"),
}
off, err := log.Append(append)
require.NoError(t, err)
require.Equal(t, uint64(0), off)

reader := log.Reader()
b, err := ioutil.ReadAll(reader)
require.NoError(t, err)

read := &api.Record{}
err = proto.Unmarshal(b[lenWidth:], read)
require.NoError(t, err)
require.Equal(t, append.Value, read.Value)
}

// END: reader

// START: truncate
func testTruncate(t *testing.T, log *Log) {
append := &api.Record{
Value: []byte("hello world"),
}
for i := 0; i < 3; i++ {
_, err := log.Append(append)
require.NoError(t, err)
}

err := log.Truncate(1)
require.NoError(t, err)

_, err = log.Read(0)
require.Error(t, err)
}

// END: truncate

0 comments on commit ef982fb

Please sign in to comment.