Bladeren bron

Chunk profile fetching

pprof fetches profiles concurrently, which allows to profile multiple running
servers concurrently. However, this may translate into a large use of memory
if many profiles are merged, as pprof attempts to decode all profiles in parallel.

Limit the concurrency by chunking the concurrent fetches in groups of up to
64 profiles.
Raul Silvera 9 jaren geleden
bovenliggende
commit
039e3813be
1 gewijzigde bestanden met toevoegingen van 102 en 48 verwijderingen
  1. 102
    48
      internal/driver/fetch.go

+ 102
- 48
internal/driver/fetch.go Bestand weergeven

@@ -39,10 +39,31 @@ func fetchProfiles(s *source, o *plugin.Options) (*profile.Profile, error) {
39 39
 		return nil, err
40 40
 	}
41 41
 
42
-	p, msrcs, save, err := concurrentGrab(s, o.Fetch, o.Obj, o.UI)
42
+	sources := make([]profileSource, 0, len(s.Sources)+len(s.Base))
43
+	for _, src := range s.Sources {
44
+		sources = append(sources, profileSource{
45
+			addr:   src,
46
+			source: s,
47
+			scale:  1,
48
+		})
49
+	}
50
+	for _, src := range s.Base {
51
+		sources = append(sources, profileSource{
52
+			addr:   src,
53
+			source: s,
54
+			scale:  -1,
55
+		})
56
+	}
57
+	p, msrcs, save, cnt, err := chunkedGrab(sources, o.Fetch, o.Obj, o.UI)
43 58
 	if err != nil {
44 59
 		return nil, err
45 60
 	}
61
+	if cnt == 0 {
62
+		return nil, fmt.Errorf("failed to fetch any profiles")
63
+	}
64
+	if want, got := len(sources), cnt; want != got {
65
+		o.UI.PrintErr(fmt.Sprintf("fetched %d profiles out of %d", got, want))
66
+	}
46 67
 
47 68
 	// Symbolize the merged profile.
48 69
 	if err := o.Sym.Symbolize(s.Symbolize, msrcs, p); err != nil {
@@ -79,68 +100,91 @@ func fetchProfiles(s *source, o *plugin.Options) (*profile.Profile, error) {
79 100
 	return p, nil
80 101
 }
81 102
 
103
+// chunkedGrab fetches the profiles described in source and merges them into
104
+// a single profile. It fetches a chunk of profiles concurrently, with a maximum
105
+// chunk size to limit its memory usage.
106
+func chunkedGrab(sources []profileSource, fetch plugin.Fetcher, obj plugin.ObjTool, ui plugin.UI) (*profile.Profile, plugin.MappingSources, bool, int, error) {
107
+	const chunkSize = 64
108
+
109
+	var p *profile.Profile
110
+	var msrc plugin.MappingSources
111
+	var save bool
112
+	var count int
113
+
114
+	for start := 0; start < len(sources); start += chunkSize {
115
+		end := start + chunkSize
116
+		if end > len(sources) {
117
+			end = len(sources)
118
+		}
119
+		chunkP, chunkMsrc, chunkSave, chunkCount, chunkErr := concurrentGrab(sources[start:end], fetch, obj, ui)
120
+		switch {
121
+		case chunkErr != nil:
122
+			return nil, nil, false, 0, chunkErr
123
+		case chunkP == nil:
124
+			continue
125
+		case p == nil:
126
+			p, msrc, save, count = chunkP, chunkMsrc, chunkSave, chunkCount
127
+		default:
128
+			p, msrc, chunkErr = combineProfiles([]*profile.Profile{p, chunkP}, []plugin.MappingSources{msrc, chunkMsrc})
129
+			if chunkErr != nil {
130
+				return nil, nil, false, 0, chunkErr
131
+			}
132
+			if chunkSave {
133
+				save = true
134
+			}
135
+			count += chunkCount
136
+		}
137
+	}
138
+	return p, msrc, save, count, nil
139
+}
140
+
82 141
 // concurrentGrab fetches multiple profiles concurrently
83
-func concurrentGrab(s *source, fetch plugin.Fetcher, obj plugin.ObjTool, ui plugin.UI) (*profile.Profile, plugin.MappingSources, bool, error) {
142
+func concurrentGrab(sources []profileSource, fetch plugin.Fetcher, obj plugin.ObjTool, ui plugin.UI) (*profile.Profile, plugin.MappingSources, bool, int, error) {
84 143
 	wg := sync.WaitGroup{}
85
-	numprofs := len(s.Sources) + len(s.Base)
86
-	profs := make([]*profile.Profile, numprofs)
87
-	msrcs := make([]plugin.MappingSources, numprofs)
88
-	remote := make([]bool, numprofs)
89
-	errs := make([]error, numprofs)
90
-	for i, source := range s.Sources {
91
-		wg.Add(1)
92
-		go func(i int, src string) {
144
+	wg.Add(len(sources))
145
+	for i := range sources {
146
+		go func(s *profileSource) {
93 147
 			defer wg.Done()
94
-			profs[i], msrcs[i], remote[i], errs[i] = grabProfile(s, src, 1, fetch, obj, ui)
95
-		}(i, source)
96
-	}
97
-	for i, source := range s.Base {
98
-		wg.Add(1)
99
-		go func(i int, src string) {
100
-			defer wg.Done()
101
-			profs[i], msrcs[i], remote[i], errs[i] = grabProfile(s, src, -1, fetch, obj, ui)
102
-		}(i+len(s.Sources), source)
148
+			s.p, s.msrc, s.remote, s.err = grabProfile(s.source, s.addr, s.scale, fetch, obj, ui)
149
+		}(&sources[i])
103 150
 	}
104 151
 	wg.Wait()
152
+
105 153
 	var save bool
106
-	var numFailed = 0
107
-	for i, src := range s.Sources {
108
-		if errs[i] != nil {
109
-			ui.PrintErr(src + ": " + errs[i].Error())
110
-			numFailed++
111
-		}
112
-		save = save || remote[i]
113
-	}
114
-	for i, src := range s.Base {
115
-		b := i + len(s.Sources)
116
-		if errs[b] != nil {
117
-			ui.PrintErr(src + ": " + errs[b].Error())
118
-			numFailed++
154
+	profiles := make([]*profile.Profile, 0, len(sources))
155
+	msrcs := make([]plugin.MappingSources, 0, len(sources))
156
+	for i := range sources {
157
+		s := &sources[i]
158
+		if err := s.err; err != nil {
159
+			ui.PrintErr(s.addr + ": " + err.Error())
160
+			continue
119 161
 		}
120
-		save = save || remote[b]
121
-	}
122
-	if numFailed == numprofs {
123
-		return nil, nil, false, fmt.Errorf("failed to fetch any profiles")
162
+		save = save || s.remote
163
+		profiles = append(profiles, s.p)
164
+		msrcs = append(msrcs, s.msrc)
165
+		*s = profileSource{}
124 166
 	}
125
-	if numFailed > 0 {
126
-		ui.PrintErr(fmt.Sprintf("fetched %d profiles out of %d", numprofs-numFailed, numprofs))
167
+
168
+	if len(profiles) == 0 {
169
+		return nil, nil, false, 0, nil
127 170
 	}
128 171
 
129
-	scaled := make([]*profile.Profile, 0, numprofs)
130
-	for _, p := range profs {
131
-		if p != nil {
132
-			scaled = append(scaled, p)
133
-		}
172
+	p, msrc, err := combineProfiles(profiles, msrcs)
173
+	if err != nil {
174
+		return nil, nil, false, 0, err
134 175
 	}
176
+	return p, msrc, save, len(profiles), nil
177
+}
135 178
 
179
+func combineProfiles(profiles []*profile.Profile, msrcs []plugin.MappingSources) (*profile.Profile, plugin.MappingSources, error) {
136 180
 	// Merge profiles.
137
-	if err := measurement.ScaleProfiles(scaled); err != nil {
138
-		return nil, nil, false, err
181
+	if err := measurement.ScaleProfiles(profiles); err != nil {
182
+		return nil, nil, err
139 183
 	}
140 184
 
141
-	p, err := profile.Merge(scaled)
185
+	p, err := profile.Merge(profiles)
142 186
 	if err != nil {
143
-		return nil, nil, false, err
187
+		return nil, nil, err
144 188
 	}
145 189
 
146 190
 	// Combine mapping sources.
@@ -150,8 +194,18 @@ func concurrentGrab(s *source, fetch plugin.Fetcher, obj plugin.ObjTool, ui plug
150 194
 			msrc[m] = append(msrc[m], s...)
151 195
 		}
152 196
 	}
197
+	return p, msrc, nil
198
+}
199
+
200
+type profileSource struct {
201
+	addr   string
202
+	source *source
203
+	scale  float64
153 204
 
154
-	return p, msrc, save, nil
205
+	p      *profile.Profile
206
+	msrc   plugin.MappingSources
207
+	remote bool
208
+	err    error
155 209
 }
156 210
 
157 211
 // setTmpDir sets the PPROF_TMPDIR environment variable with a new