|
22 | 22 | import com.google.common.cache.Cache;
|
23 | 23 | import com.google.common.cache.CacheBuilder;
|
24 | 24 | import com.google.common.cache.RemovalListener;
|
| 25 | +import com.google.common.cache.RemovalNotification; |
25 | 26 | import com.google.common.collect.ImmutableList;
|
26 | 27 | import com.google.common.collect.ImmutableMap;
|
27 | 28 |
|
@@ -183,31 +184,7 @@ private LocalNode(
|
183 | 184 | this.currentSessions = CacheBuilder.newBuilder()
|
184 | 185 | .expireAfterAccess(sessionTimeout)
|
185 | 186 | .ticker(ticker)
|
186 |
| - .removalListener((RemovalListener<SessionId, SessionSlot>) notification -> { |
187 |
| - if (notification.getKey() != null && notification.getValue() != null) { |
188 |
| - SessionSlot slot = notification.getValue(); |
189 |
| - SessionId id = notification.getKey(); |
190 |
| - if (notification.wasEvicted()) { |
191 |
| - // Session is timing out, stopping it by sending a DELETE |
192 |
| - LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
193 |
| - slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
194 |
| - } |
195 |
| - // Attempt to stop the session |
196 |
| - slot.stop(); |
197 |
| - // Invalidate temp file system |
198 |
| - this.tempFileSystems.invalidate(id); |
199 |
| - // Decrement pending sessions if Node is draining |
200 |
| - if (this.isDraining()) { |
201 |
| - int done = pendingSessions.decrementAndGet(); |
202 |
| - if (done <= 0) { |
203 |
| - LOG.info("Node draining complete!"); |
204 |
| - bus.fire(new NodeDrainComplete(this.getId())); |
205 |
| - } |
206 |
| - } |
207 |
| - } else { |
208 |
| - LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
209 |
| - } |
210 |
| - }) |
| 187 | + .removalListener(this::stopTimedOutSession) |
211 | 188 | .build();
|
212 | 189 |
|
213 | 190 | ScheduledExecutorService sessionCleanupNodeService =
|
@@ -250,6 +227,36 @@ private LocalNode(
|
250 | 227 | new JMXHelper().register(this);
|
251 | 228 | }
|
252 | 229 |
|
| 230 | + private void stopTimedOutSession(RemovalNotification<SessionId, SessionSlot> notification) { |
| 231 | + if (notification.getKey() != null && notification.getValue() != null) { |
| 232 | + SessionSlot slot = notification.getValue(); |
| 233 | + SessionId id = notification.getKey(); |
| 234 | + if (notification.wasEvicted()) { |
| 235 | + // Session is timing out, stopping it by sending a DELETE |
| 236 | + LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
| 237 | + try { |
| 238 | + slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
| 239 | + } catch (Exception e) { |
| 240 | + LOG.log(Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 241 | + } |
| 242 | + } |
| 243 | + // Attempt to stop the session |
| 244 | + slot.stop(); |
| 245 | + // Invalidate temp file system |
| 246 | + this.tempFileSystems.invalidate(id); |
| 247 | + // Decrement pending sessions if Node is draining |
| 248 | + if (this.isDraining()) { |
| 249 | + int done = pendingSessions.decrementAndGet(); |
| 250 | + if (done <= 0) { |
| 251 | + LOG.info("Node draining complete!"); |
| 252 | + bus.fire(new NodeDrainComplete(this.getId())); |
| 253 | + } |
| 254 | + } |
| 255 | + } else { |
| 256 | + LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
| 257 | + } |
| 258 | + } |
| 259 | + |
253 | 260 | public static Builder builder(
|
254 | 261 | Tracer tracer,
|
255 | 262 | EventBus bus,
|
|
0 commit comments