浏览代码

Make Profile copying and encoding thread-safe. (#232)

Added some multi-threaded tests that allow the race detector
to detect race conditions in pprof internals.
Sanjay Ghemawat 7 年前
父节点
当前提交
f0300aaaf0
共有 3 个文件被更改,包括 61 次插入10 次删除
  1. 18
    0
      internal/driver/webui_test.go
  2. 16
    10
      profile/profile.go
  3. 27
    0
      profile/profile_test.go

+ 18
- 0
internal/driver/webui_test.go 查看文件

23
 	"net/url"
23
 	"net/url"
24
 	"os/exec"
24
 	"os/exec"
25
 	"regexp"
25
 	"regexp"
26
+	"sync"
26
 	"testing"
27
 	"testing"
27
 
28
 
28
 	"github.com/google/pprof/internal/plugin"
29
 	"github.com/google/pprof/internal/plugin"
101
 		}
102
 		}
102
 	}
103
 	}
103
 
104
 
105
+	// Also fetch all the test case URLs in parallel to test thread
106
+	// safety when run under the race detector.
107
+	var wg sync.WaitGroup
108
+	for _, c := range testcases {
109
+		if c.needDot && !haveDot {
110
+			continue
111
+		}
112
+		path := server.URL + c.path
113
+		for count := 0; count < 2; count++ {
114
+			wg.Add(1)
115
+			go func() {
116
+				http.Get(path)
117
+				wg.Done()
118
+			}()
119
+		}
120
+	}
121
+	wg.Wait()
104
 }
122
 }
105
 
123
 
106
 // Implement fake object file support.
124
 // Implement fake object file support.

+ 16
- 10
profile/profile.go 查看文件

26
 	"regexp"
26
 	"regexp"
27
 	"sort"
27
 	"sort"
28
 	"strings"
28
 	"strings"
29
+	"sync"
29
 	"time"
30
 	"time"
30
 )
31
 )
31
 
32
 
47
 	PeriodType    *ValueType
48
 	PeriodType    *ValueType
48
 	Period        int64
49
 	Period        int64
49
 
50
 
51
+	// The following fields are modified during encoding and copying,
52
+	// so are protected by a Mutex.
53
+	encodeMu sync.Mutex
54
+
50
 	commentX           []int64
55
 	commentX           []int64
51
 	dropFramesX        int64
56
 	dropFramesX        int64
52
 	keepFramesX        int64
57
 	keepFramesX        int64
296
 	}
301
 	}
297
 }
302
 }
298
 
303
 
299
-// Write writes the profile as a gzip-compressed marshaled protobuf.
300
-func (p *Profile) Write(w io.Writer) error {
304
+func serialize(p *Profile) []byte {
305
+	p.encodeMu.Lock()
301
 	p.preEncode()
306
 	p.preEncode()
302
 	b := marshal(p)
307
 	b := marshal(p)
308
+	p.encodeMu.Unlock()
309
+	return b
310
+}
311
+
312
+// Write writes the profile as a gzip-compressed marshaled protobuf.
313
+func (p *Profile) Write(w io.Writer) error {
303
 	zw := gzip.NewWriter(w)
314
 	zw := gzip.NewWriter(w)
304
 	defer zw.Close()
315
 	defer zw.Close()
305
-	_, err := zw.Write(b)
316
+	_, err := zw.Write(serialize(p))
306
 	return err
317
 	return err
307
 }
318
 }
308
 
319
 
309
 // WriteUncompressed writes the profile as a marshaled protobuf.
320
 // WriteUncompressed writes the profile as a marshaled protobuf.
310
 func (p *Profile) WriteUncompressed(w io.Writer) error {
321
 func (p *Profile) WriteUncompressed(w io.Writer) error {
311
-	p.preEncode()
312
-	b := marshal(p)
313
-	_, err := w.Write(b)
322
+	_, err := w.Write(serialize(p))
314
 	return err
323
 	return err
315
 }
324
 }
316
 
325
 
605
 
614
 
606
 // Copy makes a fully independent copy of a profile.
615
 // Copy makes a fully independent copy of a profile.
607
 func (p *Profile) Copy() *Profile {
616
 func (p *Profile) Copy() *Profile {
608
-	p.preEncode()
609
-	b := marshal(p)
610
-
611
 	pp := &Profile{}
617
 	pp := &Profile{}
612
-	if err := unmarshal(b, pp); err != nil {
618
+	if err := unmarshal(serialize(p), pp); err != nil {
613
 		panic(err)
619
 		panic(err)
614
 	}
620
 	}
615
 	if err := pp.postDecode(); err != nil {
621
 	if err := pp.postDecode(); err != nil {

+ 27
- 0
profile/profile_test.go 查看文件

21
 	"path/filepath"
21
 	"path/filepath"
22
 	"regexp"
22
 	"regexp"
23
 	"strings"
23
 	"strings"
24
+	"sync"
24
 	"testing"
25
 	"testing"
25
 
26
 
26
 	"github.com/google/pprof/internal/proftest"
27
 	"github.com/google/pprof/internal/proftest"
694
 		t.Errorf("got %s for main", testProfile1.Mapping[0].File)
695
 		t.Errorf("got %s for main", testProfile1.Mapping[0].File)
695
 	}
696
 	}
696
 }
697
 }
698
+
699
+// parallel runs n copies of fn in parallel.
700
+func parallel(n int, fn func()) {
701
+	var wg sync.WaitGroup
702
+	wg.Add(n)
703
+	for i := 0; i < n; i++ {
704
+		go func() {
705
+			fn()
706
+			wg.Done()
707
+		}()
708
+	}
709
+	wg.Wait()
710
+}
711
+
712
+func TestThreadSafety(t *testing.T) {
713
+	src := testProfile1.Copy()
714
+	parallel(4, func() { src.Copy() })
715
+	parallel(4, func() {
716
+		var b bytes.Buffer
717
+		src.WriteUncompressed(&b)
718
+	})
719
+	parallel(4, func() {
720
+		var b bytes.Buffer
721
+		src.Write(&b)
722
+	})
723
+}