43
43
import java .util .concurrent .Executors ;
44
44
import java .util .concurrent .TimeoutException ;
45
45
import java .util .concurrent .atomic .AtomicLong ;
46
+ import java .util .concurrent .locks .Lock ;
47
+ import java .util .concurrent .locks .ReadWriteLock ;
48
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
46
49
import java .util .function .Consumer ;
47
50
import java .util .logging .Level ;
48
51
import java .util .logging .Logger ;
@@ -64,6 +67,7 @@ public class Connection implements Closeable {
64
67
private static final AtomicLong NEXT_ID = new AtomicLong (1L );
65
68
private final WebSocket socket ;
66
69
private final Map <Long , Consumer <Either <Throwable , JsonInput >>> methodCallbacks = new LinkedHashMap <>();
70
+ private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock (true );
67
71
private final Multimap <Event <?>, Consumer <?>> eventCallbacks = HashMultimap .create ();
68
72
69
73
public Connection (HttpClient client , String url ) {
@@ -162,14 +166,22 @@ public <X> void addListener(Event<X> event, Consumer<X> handler) {
162
166
Require .nonNull ("Event to listen for" , event );
163
167
Require .nonNull ("Handler to call" , handler );
164
168
165
- synchronized (eventCallbacks ) {
169
+ Lock lock = callbacksLock .writeLock ();
170
+ lock .lock ();
171
+ try {
166
172
eventCallbacks .put (event , handler );
173
+ } finally {
174
+ lock .unlock ();
167
175
}
168
176
}
169
177
170
178
public void clearListeners () {
171
- synchronized (eventCallbacks ) {
179
+ Lock lock = callbacksLock .writeLock ();
180
+ lock .lock ();
181
+ try {
172
182
eventCallbacks .clear ();
183
+ } finally {
184
+ lock .unlock ();
173
185
}
174
186
}
175
187
@@ -233,7 +245,9 @@ private void handle(CharSequence data) {
233
245
LOG .log (
234
246
getDebugLogLevel (),
235
247
String .format ("Method %s called with %d callbacks available" , raw .get ("method" ), eventCallbacks .keySet ().size ()));
236
- synchronized (eventCallbacks ) {
248
+ Lock lock = callbacksLock .readLock ();
249
+ lock .lock ();
250
+ try {
237
251
// TODO: Also only decode once.
238
252
eventCallbacks .keySet ().stream ()
239
253
.peek (event -> LOG .log (
@@ -275,6 +289,8 @@ private void handle(CharSequence data) {
275
289
}
276
290
}
277
291
});
292
+ } finally {
293
+ lock .unlock ();
278
294
}
279
295
} else {
280
296
LOG .warning ("Unhandled type: " + data );
0 commit comments