BoltDB 是什么?
BoltDB 是一个纯粹的 k/v 存储数据库,本质上是一个本地的单机数据库系统。这个项目的目的是为了提供一个简单、快速、可信赖的数据库系统,在完全不需要诸如:mysql,postgres 数据库服务的情况下。
基本概念
创建数据库
DB 是最顶层的对象,它是你磁盘上面的一个文件,也是你数据的一致性快照。
可以使用一下代码创建你的数据库:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
import (
"log"
"github.com/boltdb/bolt"
)
func main() {
// Open the my.db data file in your current directory.
// It will be created if it doesn't exist.
db, err := bolt.Open("my.db", 0600, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
...
}
注意:Bolt 会给你的数据库文件加锁,所以多个进程不能同时打开数据库文件。打开一个已经 Open 的数据库文件会导致阻塞,直到其它进程释放这个数据库文件。为了防止无止尽的等待,你可以传递一个 timeout 选项,当你 Open 数据库的时候,如以下代码:
1
db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
事务
Bolt 同一时刻只能有一个 read-write 事务,但是可以有多个 read-only 事务。当事务开始的时候,每个事物都会有一个一致性数据视图。
单个事务和对象创建来自 bucket 是非线程安全的。在多 goroutines 的场景下必须给每一个 goroutines 开启事务或者使用锁确保同一时刻只有一个 goroutines 访问事务。创建事务是线程安全的。
Read-write 事务
开启一个读写事务
1
2
3
4
err := db.Update(func(tx *bolt.Tx) error {
...
return nil
})
在这个闭包函数里面,你有一个一致性数据视图。你可以提交事务通过 return nil 在最后一行,你也可以回滚事务在任何时间点,通过返回一个 error。
Read-only 事务
开启一个只读事务
1
2
3
4
err := db.View(func(tx *bolt.Tx) error {
...
return nil
})
在只读事务里面不能更改数据,只能对数据进行查询、检索等操作。
批量 read-write 事务
1
2
3
4
err := db.Batch(func(tx *bolt.Tx) error {
...
return nil
})
手动管理事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Start a writable transaction.
tx, err := db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Use the transaction...
_, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
return err
}
// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
return err
}
DB.Begin() 的参数必须为 true,如果我们开启的是一个写事务。
Buckets
创建 Bucket
Buckets 是一个数据集合,你可以类比为 mysql 的数据表。所有的 key 在 bucket 的里面必须唯一,可以使用 DB.CreateBucket() 函数创建一个 bucket
1
2
3
4
5
6
7
db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
return nil
})
也可以使用 Tx.CreateBucketIfNotExists() 创建 bucket,仅仅当 bucket 不存在的时候才会创建,使用 Tx.DeleteBucket() 删除一个 bucket。
创建键值对
使用 Bucket.Put() 存储键值对
1
2
3
4
5
db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("MyBucket"))
err := b.Put([]byte("answer"), []byte("42"))
return err
})
使用 Bucket.Get() 获取值
1
2
3
4
5
6
db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("MyBucket"))
v := b.Get([]byte("answer"))
fmt.Printf("The answer is: %s\n", v)
return nil
})
Get() 不会返回任何错误,除非系统错误,这个函数确保任何时候都是正常工作的。如果 key 存在,它将返回 byte slice value,如果 key 不存在,它将返回 nil。如果你设置了一个 zero-length 的值,它是不同于 key 不存在。使用 Bucket.Delete() 删除一个值。
注意:Get() 返回的值仅仅在事务里面有效,如果你想在事务外面使用,请使用 copy() 函数复制到另外一个 byte 类型的变量。
自增长值
使用 NextSequence() 可以给你的键值对生成一个唯一序列号,如下例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// CreateUser saves u to the store. The new user ID is set on u once the data is persisted.
func (s *Store) CreateUser(u *User) error {
return s.db.Update(func(tx *bolt.Tx) error {
// Retrieve the users bucket.
// This should be created when the DB is first opened.
b := tx.Bucket([]byte("users"))
// Generate ID for the user.
// This returns an error only if the Tx is closed or not writeable.
// That can't happen in an Update() call so I ignore the error check.
id, _ := b.NextSequence()
u.ID = int(id)
// Marshal user data into bytes.
buf, err := json.Marshal(u)
if err != nil {
return err
}
// Persist bytes to users bucket.
return b.Put(itob(u.ID), buf)
})
}
// itob returns an 8-byte big endian representation of v.
func itob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}
type User struct {
ID int
...
}
迭代
使用 Cursor 迭代所有键值
因为 Bucket 里面的数据都是字节序存储,所以迭代速度极快。
1
2
3
4
5
6
7
8
9
10
11
12
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
b := tx.Bucket([]byte("MyBucket"))
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("key=%s, value=%s\n", k, v)
}
return nil
})
Cursor 可使用的函数:
1
2
3
4
5
First() Move to the first key.
Last() Move to the last key.
Seek() Move to a specific key.
Next() Move to the next key.
Prev() Move to the previous key.
如果迭代到末尾,Next()
将会返回 nil 值。在迭代期间,如果 key 非空,但是值是 nil,说明这个 key 关联的是一个 bucket 而非一个 value,使用 Bucket.Bucket() 访问子桶(Bucket 是可以嵌套的)。
前缀匹配
1
2
3
4
5
6
7
8
9
10
11
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket([]byte("MyBucket")).Cursor()
prefix := []byte("1234")
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
fmt.Printf("key=%s, value=%s\n", k, v)
}
return nil
})
范围查找
如果你使用可排序的时间编码,例如:RFC3339。那么你可以通过时间范围进行查找。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
db.View(func(tx *bolt.Tx) error {
// Assume our events bucket exists and has RFC3339 encoded time keys.
c := tx.Bucket([]byte("Events")).Cursor()
// Our time range spans the 90's decade.
min := []byte("1990-01-01T00:00:00Z")
max := []byte("2000-01-01T00:00:00Z")
// Iterate over the 90's.
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
fmt.Printf("%s: %s\n", k, v)
}
return nil
})
注意:RFC3339 是可排序的,Golang 实现 RFC3339Nano 在浮点数之后没有使用一个固定的数字,因此它是不可排序的。
ForEach()
1
2
3
4
5
6
7
8
9
10
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
b := tx.Bucket([]byte("MyBucket"))
b.ForEach(func(k, v []byte) error {
fmt.Printf("key=%s, value=%s\n", k, v)
return nil
})
return nil
})
key,value 仅仅在事务里面有效,如果你想在事务外使用,请使用 copy() copy 到另一个 byte slice。
嵌套 Bucket
你也可以创建一个嵌套 Bucket 在 DB 里面:
1
2
3
func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
func (*Bucket) DeleteBucket(key []byte) error
如例子所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// createUser creates a new user in the given account.
func createUser(accountID int, u *User) error {
// Start the transaction.
tx, err := db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Retrieve the root bucket for the account.
// Assume this has already been created when the account was set up.
root := tx.Bucket([]byte(strconv.FormatUint(accountID, 10)))
// Setup the users bucket.
bkt, err := root.CreateBucketIfNotExists([]byte("USERS"))
if err != nil {
return err
}
// Generate an ID for the new user.
userID, err := bkt.NextSequence()
if err != nil {
return err
}
u.ID = userID
// Marshal and save the encoded user.
if buf, err := json.Marshal(u); err != nil {
return err
} else if err := bkt.Put([]byte(strconv.FormatUint(u.ID, 10)), buf); err != nil {
return err
}
// Commit the transaction.
if err := tx.Commit(); err != nil {
return err
}
return nil
}
数据备份
可以使用 Tx.WriteTo() 创建一个一致性视图到 writer。如果你调用这个函数在一个只读视图,它将会执行一个热备份,不会阻塞你的读写操作。
默认会使用操作系统的 Page Cache,以做到更高效的备份。
一个更通常的备份案例,你可以使用 Curl 工具进行备份:
1
2
3
4
5
6
7
8
9
10
11
12
func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
err := db.View(func(tx *bolt.Tx) error {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
_, err := tx.WriteTo(w)
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
所以你可以使用命令进行备份:
1
curl http://localhost/backup > my.db
或者你可以打开浏览器进行备份,打开这个链接 http://localhost/backup,会自动下载备份文件。
你也可以使用 Tx.CopyFile() 备份到另一个文件。