golang的线程安全map

自从上次的文章之后就开始着手实现线程安全的字典,今天稍稍做一下整理把过程记录下来,虽然最新的go 1.9已经支持线程安全的map了😂

期间简单查了一些资料,因为之前有个说法,忘记是从哪里听来的了,就是“golang中的并发尽量使用channel来实现,避免使用锁,更加高效一些”于是当时就萌生了使用channel实现并发控制这么个逻辑,大概的框架应该是这样:

所有对于字典的访问(读/写)action以及一个response channel 都通过communicate channel传递给goroutine,之后goroutine将必要的结果回写给response channel,相当于是利用一个goroutine来对map进行读写,访问等一系列操作,同时只通过一个channel来与外界进行交互通过这种方法既能实现线程安全,又能保证没有使用锁,有些同学可能会说,你这就一个线程处理对于字典的访问,效率可靠吗?关于这种担心其实是没必要的,因为线程多了反而会增加竞争增加等待的时间,而且一个更好的例子就是redis本身就是单进程的你看redis慢吗?对吧:)

调研

想法萌生之后随后就去调研一些看有没有已经实现的先例,最终结果如下:

  1. 一篇关于golang并发访问map的博文,博主在14年的时候将这种想法做了记录但是并没有做出相应的实现,读完有一种相见恨晚的感觉
  2. 一段使用select控制并发访问的代码实现
  3. 一个使用lock实现的线程安全的map
  4. 一篇关于优化字典访问的博文

关于1中的博文这里就不再赘述,有兴趣的同学可以自己阅读一下原文,文章大意和我在上文写的是一样的:)

关于2中的代码感兴趣的同学可以看下里面monitor、和packetHandler两个函数的实现,很有借鉴意义

关于3的代码也很好理解它主要是在字典的外层包了一层sync的lock以及unlock,同时实现了对于字典的序列化

关于4的博文其主要介绍了在使用sync进行lock以及unlock的时候如何选择合适的时机最大化的提供效率

实现

查看完相关资料之后基本心里有数了,并且鉴于并没有一个类似的实现,还是决定自己动手做一个,下面贴一下代码,各位观看的大神看到不好的地方请轻拍:)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package rmap

import (
//"encoding/json"
//"log"
)
const (
MAP_INIT = 1024
SET = 1 << iota
GET
DEL
LEN
ITER
)

var MAX_GET int = 32
type KT interface{}
type VT interface{}
type KP *interface{}
type VP *interface{}
type CB chan bool
type MAP map[interface{}]interface{}

type data struct{
key KT
value VT
}
//type res struct{
// exist bool
//}

type message struct{
action int `signal operate map`
data
sync CB
}

type RoutineMap struct{
data MAP
chan_in chan *message
//chan_out chan VP
}

func NewMap()*RoutineMap{
r := new(RoutineMap)
r.init()
return r
}

func (r *RoutineMap)init(){
r.chan_in = make(chan *message,MAX_GET)
r.data = make(MAP,MAP_INIT)
r.manager_run()
//r.chan_out = make(VP,MAX_GET)
}
func (r *RoutineMap)manager_run(){
go func(){
for{
select{
case m := <- r.chan_in:
switch m.action{
case SET://unecessary sync
r.data[m.key] = m.value
case DEL://unecessary sync
delete(r.data,m.key)
case LEN://sync
m.value = len(r.data)
m.sync <- true
case GET://sync
v,e := r.data[m.key]
m.value = v
m.sync <- e
case ITER://sync
m.sync <- true
<- m.sync
}
}
}
}()
}
func send_message(r *RoutineMap,m *message){
r.chan_in <- m
}

//func wait_sync(m *message)bool{
// s := <- m.sync
// return s
//}

func wait_message(m *message)bool{
s := <- m.sync
return s
}

func send_sync(m *message){
m.sync <- true
}

func (r *RoutineMap)Set(k,v KT){
m := message{
action:SET,
data:data{
key:k,
value:v,
},
}
send_message(r,&m)
//r.chan_in <- &m
}
func (r *RoutineMap)Del(k KT){
m := message{
action:DEL,
data:data{
key:k,
},
}
send_message(r,&m)
//r.chan_in <- &m
}
func (r *RoutineMap)Len()int{
m := message{
action:LEN,
sync:make(CB,1),
}
defer close(m.sync)
send_message(r,&m)
//r.chan_in <- &m
wait_message(&m)
return m.value.(int)
}
func (r *RoutineMap)Get(k KT)(VT,bool){
m := message{
action:GET,
data:data{
key:k,
},
sync:make(CB,1),
}
defer close(m.sync)
send_message(r,&m)
exist := wait_message(&m)
return m.value,exist
}
func (r *RoutineMap)Keys()(kl []KT){
m := message{
action:ITER,
sync:make(CB,1),
}
defer close(m.sync)
send_message(r,&m)
wait_message(&m)
for k,_ := range r.data{
kl = append(kl,k)
}
m.sync <- true
return kl
}

func (r *RoutineMap)Values()(vl []VT){
m := message{
action:ITER,
sync:make(CB,1),
}
defer close(m.sync)
send_message(r,&m)
wait_message(&m)
for _,v := range r.data{
vl = append(vl,v)
}
m.sync <- true
return vl
}

func (r *RoutineMap)Copy()*RoutineMap{
m := message{
action:ITER,
sync:make(CB,1),
}
defer close(m.sync)
send_message(r,&m)
wait_message(&m)
rm := r
return rm
}
//还需要确认一下是不是直接访问data

经过

在实现的过程中,中间主要有两种思路进行了比较:

  1. goroutine执行对于字典的读、写,对外只暴露一个交互的channel
  2. channel长度为1将信号发送到channel导致channel满(这样当其他操作首先写入channel就会发生阻塞等待),保证了只有一个线程获取channel,在函数中实现对字典进行随意的访问、修改

后来比较了一下还是选择了第一种模型,首先来说第二种模型抽象性差很多,并且,这不就是把channel当lock来用吗。。。并且第一种还有一个好处,就是它十分像redis的模型,后续我甚至可以给这个lib加上过期功能的实现!

结尾

从代码上来看,我相信大家都能看懂,稍微有一些绕的地方主要就是这个使用channel+goroutine来进行同步控制这一步,目前尚未实现字典的序列化,以及字典的过期功能(其实我觉得这个功能很好用,在很多场景中甚至可以替代redis,毕竟如果只是为了一个过期功能给服务单独开一台redis还是很浪费的哈:)

简单介绍一下吧,如果有什么问题各位可以发邮件给我:)

字典主要支持如下几个action:

  1. set用以将k v保存到字典中,考虑到写并不需要反馈因此这里在获取到channel的访问权之后并没有等待goroutine写完
  2. del用以删除元素,异步原因同上
  3. LEN 获取字典的长度,这里考虑到对于字典的多线程访问随时会导致字典有修改,为了保证长度尽可能准确,因此采用的是sync方式在读取成功之后会阻塞,直至Len将结果取走
  4. get 获取键值对应的value,采用的同步,原因同上
  5. iter用以遍历整个字典,采用的同步,原因同上

话说,这个lib实现了还没几天,go 1.9的sync包就提供了线程安全的Map > <,我这也真是赶在了风口上啊,😂,它在里面的实现方式相对底层一些,使用了atomic包,这个包应该是从汇编级别保证了原子操作,效率应该是相对更高一些。

写到这里之后,我突然有了一个问题,那就是golang中channel的实现是怎么样子?我知道它是一个指针对应的一块区域,那么它又是怎么实现的嘞,这个需要在深究一下

转载请注明来源链接 http://just4fun.im/2017/10/23/golang的线程安全map/ 尊重知识,谢谢:)