BoltDB 基础概念

BoltDB 基础

Posted by WR on December 25, 2022

BoltDB 是什么?

​ BoltDB 是一个纯粹的 k/v 存储数据库,本质上是一个本地的单机数据库系统。这个项目的目的是为了提供一个简单、快速、可信赖的数据库系统,在完全不需要诸如:mysql,postgres 数据库服务的情况下。

基本概念

创建数据库

​ DB 是最顶层的对象,它是你磁盘上面的一个文件,也是你数据的一致性快照。

​ 可以使用一下代码创建你的数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"log"

	"github.com/boltdb/bolt"
)

func main() {
	// Open the my.db data file in your current directory.
	// It will be created if it doesn't exist.
	db, err := bolt.Open("my.db", 0600, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	...
}

​ 注意:Bolt 会给你的数据库文件加锁,所以多个进程不能同时打开数据库文件。打开一个已经 Open 的数据库文件会导致阻塞,直到其它进程释放这个数据库文件。为了防止无止尽的等待,你可以传递一个 timeout 选项,当你 Open 数据库的时候,如以下代码:

1
db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
事务

​ Bolt 同一时刻只能有一个 read-write 事务,但是可以有多个 read-only 事务。当事务开始的时候,每个事物都会有一个一致性数据视图。

​ 单个事务和对象创建来自 bucket 是非线程安全的。在多 goroutines 的场景下必须给每一个 goroutines 开启事务或者使用锁确保同一时刻只有一个 goroutines 访问事务。创建事务是线程安全的。

Read-write 事务

​ 开启一个读写事务

1
2
3
4
err := db.Update(func(tx *bolt.Tx) error {
	...
	return nil
})

​ 在这个闭包函数里面,你有一个一致性数据视图。你可以提交事务通过 return nil 在最后一行,你也可以回滚事务在任何时间点,通过返回一个 error。

Read-only 事务

​ 开启一个只读事务

1
2
3
4
err := db.View(func(tx *bolt.Tx) error {
	...
	return nil
})

​ 在只读事务里面不能更改数据,只能对数据进行查询、检索等操作。

批量 read-write 事务
1
2
3
4
err := db.Batch(func(tx *bolt.Tx) error {
	...
	return nil
})
手动管理事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Start a writable transaction.
tx, err := db.Begin(true)
if err != nil {
    return err
}
defer tx.Rollback()

// Use the transaction...
_, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
    return err
}

// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
    return err
}

​ DB.Begin() 的参数必须为 true,如果我们开启的是一个写事务。

Buckets
创建 Bucket

​ Buckets 是一个数据集合,你可以类比为 mysql 的数据表。所有的 key 在 bucket 的里面必须唯一,可以使用 DB.CreateBucket() 函数创建一个 bucket

1
2
3
4
5
6
7
db.Update(func(tx *bolt.Tx) error {
	b, err := tx.CreateBucket([]byte("MyBucket"))
	if err != nil {
		return fmt.Errorf("create bucket: %s", err)
	}
	return nil
})

也可以使用 Tx.CreateBucketIfNotExists() 创建 bucket,仅仅当 bucket 不存在的时候才会创建,使用 Tx.DeleteBucket() 删除一个 bucket。

创建键值对

​ 使用 Bucket.Put() 存储键值对

1
2
3
4
5
db.Update(func(tx *bolt.Tx) error {
	b := tx.Bucket([]byte("MyBucket"))
	err := b.Put([]byte("answer"), []byte("42"))
	return err
})

​ 使用 Bucket.Get() 获取值

1
2
3
4
5
6
db.View(func(tx *bolt.Tx) error {
	b := tx.Bucket([]byte("MyBucket"))
	v := b.Get([]byte("answer"))
	fmt.Printf("The answer is: %s\n", v)
	return nil
})

​ Get() 不会返回任何错误,除非系统错误,这个函数确保任何时候都是正常工作的。如果 key 存在,它将返回 byte slice value,如果 key 不存在,它将返回 nil。如果你设置了一个 zero-length 的值,它是不同于 key 不存在。使用 Bucket.Delete() 删除一个值。

​ 注意:Get() 返回的值仅仅在事务里面有效,如果你想在事务外面使用,请使用 copy() 函数复制到另外一个 byte 类型的变量。

自增长值

​ 使用 NextSequence() 可以给你的键值对生成一个唯一序列号,如下例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// CreateUser saves u to the store. The new user ID is set on u once the data is persisted.
func (s *Store) CreateUser(u *User) error {
    return s.db.Update(func(tx *bolt.Tx) error {
        // Retrieve the users bucket.
        // This should be created when the DB is first opened.
        b := tx.Bucket([]byte("users"))

        // Generate ID for the user.
        // This returns an error only if the Tx is closed or not writeable.
        // That can't happen in an Update() call so I ignore the error check.
        id, _ := b.NextSequence()
        u.ID = int(id)

        // Marshal user data into bytes.
        buf, err := json.Marshal(u)
        if err != nil {
            return err
        }

        // Persist bytes to users bucket.
        return b.Put(itob(u.ID), buf)
    })
}

// itob returns an 8-byte big endian representation of v.
func itob(v int) []byte {
    b := make([]byte, 8)
    binary.BigEndian.PutUint64(b, uint64(v))
    return b
}

type User struct {
    ID int
    ...
}
迭代
使用 Cursor 迭代所有键值

​ 因为 Bucket 里面的数据都是字节序存储,所以迭代速度极快。

1
2
3
4
5
6
7
8
9
10
11
12
db.View(func(tx *bolt.Tx) error {
	// Assume bucket exists and has keys
	b := tx.Bucket([]byte("MyBucket"))

	c := b.Cursor()

	for k, v := c.First(); k != nil; k, v = c.Next() {
		fmt.Printf("key=%s, value=%s\n", k, v)
	}

	return nil
})

Cursor 可使用的函数:

1
2
3
4
5
First()  Move to the first key.
Last()   Move to the last key.
Seek()   Move to a specific key.
Next()   Move to the next key.
Prev()   Move to the previous key.

​ 如果迭代到末尾,Next() 将会返回 nil 值。在迭代期间,如果 key 非空,但是值是 nil,说明这个 key 关联的是一个 bucket 而非一个 value,使用 Bucket.Bucket() 访问子桶(Bucket 是可以嵌套的)。

前缀匹配
1
2
3
4
5
6
7
8
9
10
11
db.View(func(tx *bolt.Tx) error {
	// Assume bucket exists and has keys
	c := tx.Bucket([]byte("MyBucket")).Cursor()

	prefix := []byte("1234")
	for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
		fmt.Printf("key=%s, value=%s\n", k, v)
	}

	return nil
})
范围查找

​ 如果你使用可排序的时间编码,例如:RFC3339。那么你可以通过时间范围进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
db.View(func(tx *bolt.Tx) error {
	// Assume our events bucket exists and has RFC3339 encoded time keys.
	c := tx.Bucket([]byte("Events")).Cursor()

	// Our time range spans the 90's decade.
	min := []byte("1990-01-01T00:00:00Z")
	max := []byte("2000-01-01T00:00:00Z")

	// Iterate over the 90's.
	for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
		fmt.Printf("%s: %s\n", k, v)
	}

	return nil
})

​ 注意:RFC3339 是可排序的,Golang 实现 RFC3339Nano 在浮点数之后没有使用一个固定的数字,因此它是不可排序的。

ForEach()
1
2
3
4
5
6
7
8
9
10
db.View(func(tx *bolt.Tx) error {
	// Assume bucket exists and has keys
	b := tx.Bucket([]byte("MyBucket"))

	b.ForEach(func(k, v []byte) error {
		fmt.Printf("key=%s, value=%s\n", k, v)
		return nil
	})
	return nil
})

key,value 仅仅在事务里面有效,如果你想在事务外使用,请使用 copy() copy 到另一个 byte slice。

嵌套 Bucket

​ 你也可以创建一个嵌套 Bucket 在 DB 里面:

1
2
3
func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
func (*Bucket) DeleteBucket(key []byte) error

如例子所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// createUser creates a new user in the given account.
func createUser(accountID int, u *User) error {
    // Start the transaction.
    tx, err := db.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Retrieve the root bucket for the account.
    // Assume this has already been created when the account was set up.
    root := tx.Bucket([]byte(strconv.FormatUint(accountID, 10)))

    // Setup the users bucket.
    bkt, err := root.CreateBucketIfNotExists([]byte("USERS"))
    if err != nil {
        return err
    }

    // Generate an ID for the new user.
    userID, err := bkt.NextSequence()
    if err != nil {
        return err
    }
    u.ID = userID

    // Marshal and save the encoded user.
    if buf, err := json.Marshal(u); err != nil {
        return err
    } else if err := bkt.Put([]byte(strconv.FormatUint(u.ID, 10)), buf); err != nil {
        return err
    }

    // Commit the transaction.
    if err := tx.Commit(); err != nil {
        return err
    }

    return nil
}
数据备份

​ 可以使用 Tx.WriteTo() 创建一个一致性视图到 writer。如果你调用这个函数在一个只读视图,它将会执行一个热备份,不会阻塞你的读写操作。

​ 默认会使用操作系统的 Page Cache,以做到更高效的备份。

​ 一个更通常的备份案例,你可以使用 Curl 工具进行备份:

1
2
3
4
5
6
7
8
9
10
11
12
func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
	err := db.View(func(tx *bolt.Tx) error {
		w.Header().Set("Content-Type", "application/octet-stream")
		w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
		w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
		_, err := tx.WriteTo(w)
		return err
	})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

​ 所以你可以使用命令进行备份:

1
curl http://localhost/backup > my.db

​ 或者你可以打开浏览器进行备份,打开这个链接 http://localhost/backup,会自动下载备份文件。

​ 你也可以使用 Tx.CopyFile() 备份到另一个文件。