33
33
import org .openqa .selenium .grid .data .NewSessionErrorResponse ;
34
34
import org .openqa .selenium .grid .data .NewSessionRejectedEvent ;
35
35
import org .openqa .selenium .grid .data .NewSessionRequestEvent ;
36
- import org .openqa .selenium .grid .data .NewSessionResponse ;
37
- import org .openqa .selenium .grid .data .NewSessionResponseEvent ;
38
36
import org .openqa .selenium .grid .data .NodeAddedEvent ;
39
37
import org .openqa .selenium .grid .data .NodeDrainComplete ;
40
38
import org .openqa .selenium .grid .data .NodeHeartBeatEvent ;
@@ -113,14 +111,14 @@ public class LocalDistributor extends Distributor {
113
111
private final GridModel model ;
114
112
private final Map <NodeId , Node > nodes ;
115
113
116
- private final NewSessionQueue sessionRequests ;
114
+ private final NewSessionQueue sessionQueue ;
117
115
118
116
public LocalDistributor (
119
117
Tracer tracer ,
120
118
EventBus bus ,
121
119
HttpClient .Factory clientFactory ,
122
120
SessionMap sessions ,
123
- NewSessionQueue sessionRequests ,
121
+ NewSessionQueue sessionQueue ,
124
122
Secret registrationSecret ,
125
123
Duration healthcheckInterval ) {
126
124
super (tracer , clientFactory , new DefaultSlotSelector (), sessions , registrationSecret );
@@ -130,7 +128,7 @@ public LocalDistributor(
130
128
this .sessions = Require .nonNull ("Session map" , sessions );
131
129
this .model = new GridModel (bus );
132
130
this .nodes = new ConcurrentHashMap <>();
133
- this .sessionRequests = Require .nonNull ("New Session Request Queue" , sessionRequests );
131
+ this .sessionQueue = Require .nonNull ("New Session Request Queue" , sessionQueue );
134
132
this .registrationSecret = Require .nonNull ("Registration secret" , registrationSecret );
135
133
this .healthcheckInterval = Require .nonNull ("Health check interval" , healthcheckInterval );
136
134
@@ -169,15 +167,14 @@ public static Distributor create(Config config) {
169
167
HttpClient .Factory clientFactory = new NetworkOptions (config ).getHttpClientFactory (tracer );
170
168
SessionMap sessions = new SessionMapOptions (config ).getSessionMap ();
171
169
SecretOptions secretOptions = new SecretOptions (config );
172
- NewSessionQueue sessionRequests =
173
- new NewSessionQueueOptions (config ).getSessionQueue (
174
- "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue" );
170
+ NewSessionQueue sessionQueue = new NewSessionQueueOptions (config ).getSessionQueue (
171
+ "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue" );
175
172
return new LocalDistributor (
176
173
tracer ,
177
174
bus ,
178
175
clientFactory ,
179
176
sessions ,
180
- sessionRequests ,
177
+ sessionQueue ,
181
178
secretOptions .getRegistrationSecret (),
182
179
distributorOptions .getHealthCheckInterval ());
183
180
}
@@ -393,7 +390,7 @@ public void run() {
393
390
if (hasCapacity ) {
394
391
RequestId reqId = requestIds .poll ();
395
392
if (reqId != null ) {
396
- Optional <SessionRequest > maybeRequest = sessionRequests .remove (reqId );
393
+ Optional <SessionRequest > maybeRequest = sessionQueue .remove (reqId );
397
394
// Check if polling the queue did not return null
398
395
if (maybeRequest .isPresent ()) {
399
396
handleNewSessionRequest (maybeRequest .get (), reqId );
@@ -422,34 +419,20 @@ private void handleNewSessionRequest(SessionRequest sessionRequest, RequestId re
422
419
EventAttribute .setValue (reqId .toString ()));
423
420
424
421
attributeMap .put ("request" , EventAttribute .setValue (sessionRequest .toString ()));
425
- Either <SessionNotCreatedException , CreateSessionResponse > response =
426
- newSession (sessionRequest );
427
- if (response .isRight ()) {
428
- CreateSessionResponse sessionResponse = response .right ();
429
- NewSessionResponse newSessionResponse =
430
- new NewSessionResponse (
431
- reqId ,
432
- sessionResponse .getSession (),
433
- sessionResponse .getDownstreamEncodedResponse ());
434
-
435
- bus .fire (new NewSessionResponseEvent (newSessionResponse ));
436
- } else {
437
- SessionNotCreatedException exception = response .left ();
422
+ Either <SessionNotCreatedException , CreateSessionResponse > response = newSession (sessionRequest );
438
423
439
- if (exception instanceof RetrySessionRequestException ) {
440
- boolean retried = sessionRequests .retryAddToQueue (sessionRequest );
424
+ if (response . isLeft () ) {
425
+ boolean retried = sessionQueue .retryAddToQueue (sessionRequest );
441
426
442
- attributeMap .put ("request.retry_add" , EventAttribute .setValue (retried ));
443
- span .addEvent ("Retry adding to front of queue. No slot available." , attributeMap );
427
+ attributeMap .put ("request.retry_add" , EventAttribute .setValue (retried ));
428
+ span .addEvent ("Retry adding to front of queue. No slot available." , attributeMap );
444
429
445
- if (!retried ) {
446
- span .addEvent ("Retry adding to front of queue failed." , attributeMap );
447
- fireSessionRejectedEvent (exception .getMessage (), reqId );
448
- }
449
- } else {
450
- fireSessionRejectedEvent (exception .getMessage (), reqId );
430
+ if (retried ) {
431
+ return ;
451
432
}
452
433
}
434
+
435
+ sessionQueue .complete (reqId , response );
453
436
}
454
437
}
455
438
0 commit comments