1 package org.littleshoot.proxy;
2
3 import static org.jboss.netty.channel.Channels.pipeline;
4
5 import java.lang.management.ManagementFactory;
6 import java.net.InetSocketAddress;
7 import java.nio.channels.ClosedChannelException;
8 import java.util.Collection;
9 import java.util.HashSet;
10 import java.util.LinkedList;
11 import java.util.Map;
12 import java.util.Queue;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicBoolean;
16
17 import javax.management.InstanceAlreadyExistsException;
18 import javax.management.MBeanRegistrationException;
19 import javax.management.MBeanServer;
20 import javax.management.MalformedObjectNameException;
21 import javax.management.NotCompliantMBeanException;
22 import javax.management.ObjectName;
23
24 import org.apache.commons.lang.StringUtils;
25 import org.jboss.netty.bootstrap.ClientBootstrap;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelPipeline;
31 import org.jboss.netty.channel.ChannelPipelineFactory;
32 import org.jboss.netty.channel.ChannelStateEvent;
33 import org.jboss.netty.channel.ExceptionEvent;
34 import org.jboss.netty.channel.MessageEvent;
35 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
36 import org.jboss.netty.channel.group.ChannelGroup;
37 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
38 import org.jboss.netty.handler.codec.http.HttpChunk;
39 import org.jboss.netty.handler.codec.http.HttpMethod;
40 import org.jboss.netty.handler.codec.http.HttpRequest;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /***
45 * Class for handling all HTTP requests from the browser to the proxy.
46 *
47 * Note this class only ever handles a single connection from the browser.
48 * The browser can and will, however, send requests to multiple hosts using
49 * that same connection, i.e. it will send a request to host B once a request
50 * to host A has completed.
51 */
52 public class HttpRequestHandler extends SimpleChannelUpstreamHandler
53 implements RelayListener, ConnectionData {
54
55 private final static Logger log =
56 LoggerFactory.getLogger(HttpRequestHandler.class);
57 private volatile boolean readingChunks;
58
59 private static volatile int totalBrowserToProxyConnections = 0;
60 private volatile int browserToProxyConnections = 0;
61
62 private final Map<String, Queue<ChannelFuture>> externalHostsToChannelFutures =
63 new ConcurrentHashMap<String, Queue<ChannelFuture>>();
64
65 private volatile int messagesReceived = 0;
66
67 private volatile int unansweredRequestCount = 0;
68
69 private volatile int requestsSent = 0;
70
71 private volatile int responsesReceived = 0;
72
73 private final ProxyAuthorizationManager authorizationManager;
74
75 private final Set<String> answeredRequests = new HashSet<String>();
76 private final Set<String> unansweredRequests = new HashSet<String>();
77
78 private ChannelFuture currentChannelFuture;
79
80 /***
81 * This is just for debugging.
82 */
83 private final Queue<HttpRequest> requests =
84 new LinkedList<HttpRequest>();
85
86
87 /***
88 * Note, we *can* receive requests for multiple different sites from the
89 * same connection from the browser, so the host and port most certainly
90 * does change.
91 *
92 * Why do we need to store it? We need it to lookup the appropriate
93 * external connection to send HTTP chunks to.
94 */
95 private String hostAndPort;
96 private final String chainProxyHostAndPort;
97 private final ChannelGroup channelGroup;
98
99 private final ClientSocketChannelFactory clientChannelFactory;
100 private final ProxyCacheManager cacheManager;
101
102 private final AtomicBoolean browserChannelClosed = new AtomicBoolean(false);
103 private volatile boolean receivedChannelClosed = false;
104 private final boolean useJmx;
105
106 private final RelayPipelineFactoryFactory relayPipelineFactoryFactory;
107
108 /***
109 * Creates a new class for handling HTTP requests with no frills.
110 *
111 * @param clientChannelFactory The common channel factory for clients.
112 */
113 public HttpRequestHandler(
114 final ClientSocketChannelFactory clientChannelFactory,
115 final RelayPipelineFactoryFactory relayPipelineFactoryFactory) {
116 this(null, null, null, clientChannelFactory, null,
117 relayPipelineFactoryFactory, false);
118 }
119
120 /***
121 * Creates a new class for handling HTTP requests with the specified
122 * authentication manager.
123 *
124 * @param cacheManager The manager for the cache.
125 * @param authorizationManager The class that handles any
126 * proxy authentication requirements.
127 * @param channelGroup The group of channels for keeping track of all
128 * channels we've opened.
129 * @param filters HTTP filtering rules.
130 * @param clientChannelFactory The common channel factory for clients.
131 */
132 public HttpRequestHandler(final ProxyCacheManager cacheManager,
133 final ProxyAuthorizationManager authorizationManager,
134 final ChannelGroup channelGroup,
135 final ClientSocketChannelFactory clientChannelFactory,
136 final RelayPipelineFactoryFactory relayPipelineFactoryFactory) {
137 this(cacheManager, authorizationManager, channelGroup,
138 clientChannelFactory, null, relayPipelineFactoryFactory, false);
139 }
140
141 /***
142 * Creates a new class for handling HTTP requests with the specified
143 * authentication manager.
144 *
145 * @param cacheManager The manager for the cache.
146 * @param authorizationManager The class that handles any
147 * proxy authentication requirements.
148 * @param channelGroup The group of channels for keeping track of all
149 * channels we've opened.
150 * @param filters HTTP filtering rules.
151 * @param clientChannelFactory The common channel factory for clients.
152 * @param chainProxyHostAndPort upstream proxy server host and port or null
153 * if none used.
154 * @param requestFilter An optional filter for HTTP requests.
155 * @param useJmx Whether or not to expose debugging properties via JMX.
156 */
157 public HttpRequestHandler(final ProxyCacheManager cacheManager,
158 final ProxyAuthorizationManager authorizationManager,
159 final ChannelGroup channelGroup,
160 final ClientSocketChannelFactory clientChannelFactory,
161 final String chainProxyHostAndPort,
162 final RelayPipelineFactoryFactory relayPipelineFactoryFactory,
163 final boolean useJmx) {
164 this.cacheManager = cacheManager;
165 this.authorizationManager = authorizationManager;
166 this.channelGroup = channelGroup;
167 this.clientChannelFactory = clientChannelFactory;
168 this.chainProxyHostAndPort = chainProxyHostAndPort;
169 this.relayPipelineFactoryFactory = relayPipelineFactoryFactory;
170 this.useJmx = useJmx;
171 if (useJmx) {
172 setupJmx();
173 }
174 }
175
176
177 private void setupJmx() {
178 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
179 try {
180 final Class<? extends SimpleChannelUpstreamHandler> clazz =
181 getClass();
182 final String pack = clazz.getPackage().getName();
183 final String oName =
184 pack+":type="+clazz.getSimpleName()+"-"+clazz.getSimpleName() +
185 "-"+hashCode();
186 log.info("Registering MBean with name: {}", oName);
187 final ObjectName mxBeanName = new ObjectName(oName);
188 if(!mbs.isRegistered(mxBeanName)) {
189 mbs.registerMBean(this, mxBeanName);
190 }
191 } catch (final MalformedObjectNameException e) {
192 log.error("Could not set up JMX", e);
193 } catch (final InstanceAlreadyExistsException e) {
194 log.error("Could not set up JMX", e);
195 } catch (final MBeanRegistrationException e) {
196 log.error("Could not set up JMX", e);
197 } catch (final NotCompliantMBeanException e) {
198 log.error("Could not set up JMX", e);
199 }
200 }
201
202 @Override
203 public void messageReceived(final ChannelHandlerContext ctx,
204 final MessageEvent me) {
205 if (browserChannelClosed.get()) {
206 log.info("Ignoring message since the connection to the browser " +
207 "is about to close");
208 return;
209 }
210 messagesReceived++;
211 log.info("Received "+messagesReceived+" total messages");
212 if (!readingChunks) {
213 processMessage(ctx, me);
214 }
215 else {
216 processChunk(ctx, me);
217 }
218 }
219
220 private void processChunk(final ChannelHandlerContext ctx,
221 final MessageEvent me) {
222 log.info("Processing chunk...");
223 final HttpChunk chunk = (HttpChunk) me.getMessage();
224
225
226
227
228 if (chunk.isLast()) {
229 this.readingChunks = false;
230 }
231
232
233
234
235 if (this.currentChannelFuture.getChannel().isConnected()) {
236 this.currentChannelFuture.getChannel().write(chunk);
237 }
238 else {
239 this.currentChannelFuture.addListener(new ChannelFutureListener() {
240
241 public void operationComplete(final ChannelFuture future)
242 throws Exception {
243 currentChannelFuture.getChannel().write(chunk);
244 }
245 });
246 }
247 }
248
249 private void processMessage(final ChannelHandlerContext ctx,
250 final MessageEvent me) {
251
252 final HttpRequest request = (HttpRequest) me.getMessage();
253
254
255 final Channel inboundChannel = me.getChannel();
256 if (this.cacheManager != null &&
257 this.cacheManager.returnCacheHit((HttpRequest)me.getMessage(),
258 inboundChannel)) {
259 log.info("Found cache hit! Cache wrote the response.");
260 return;
261 }
262 this.unansweredRequestCount++;
263
264 log.info("Got request: {} on channel: "+inboundChannel, request);
265 if (this.authorizationManager != null &&
266 !this.authorizationManager.handleProxyAuthorization(request, ctx)) {
267 log.info("Not authorized!!");
268 return;
269 }
270
271 if (this.chainProxyHostAndPort != null) {
272 this.hostAndPort = this.chainProxyHostAndPort;
273 } else {
274 this.hostAndPort = ProxyUtils.parseHostAndPort(request);
275 }
276
277 final class OnConnect {
278 public ChannelFuture onConnect(final ChannelFuture cf) {
279 if (request.getMethod() != HttpMethod.CONNECT) {
280 final ChannelFuture writeFuture = cf.getChannel().write(request);
281 writeFuture.addListener(new ChannelFutureListener() {
282
283 public void operationComplete(final ChannelFuture future)
284 throws Exception {
285 if (useJmx) {
286 unansweredRequests.add(request.toString());
287 }
288 requestsSent++;
289 }
290 });
291 return writeFuture;
292 }
293 else {
294 writeConnectResponse(ctx, request, cf.getChannel());
295 return cf;
296 }
297 }
298 }
299
300 final OnConnect onConnect = new OnConnect();
301
302 final ChannelFuture curFuture = getChannelFuture();
303 if (curFuture != null) {
304 log.info("Using existing connection...");
305 this.currentChannelFuture = curFuture;
306 if (curFuture.getChannel().isConnected()) {
307 onConnect.onConnect(curFuture);
308 }
309 else {
310 final ChannelFutureListener cfl = new ChannelFutureListener() {
311 public void operationComplete(final ChannelFuture future)
312 throws Exception {
313 onConnect.onConnect(curFuture);
314 }
315 };
316 curFuture.addListener(cfl);
317 }
318 }
319 else {
320 log.info("Establishing new connection");
321 final ChannelFuture cf =
322 newChannelFuture(request, inboundChannel);
323 cf.addListener(new ChannelFutureListener() {
324 public void operationComplete(final ChannelFuture future)
325 throws Exception {
326 final Channel channel = future.getChannel();
327 if (channelGroup != null) {
328 channelGroup.add(channel);
329 }
330 if (future.isSuccess()) {
331 log.info("Connected successfully to: {}", channel);
332 log.info("Writing message on channel...");
333 final ChannelFuture wf = onConnect.onConnect(cf);
334 wf.addListener(new ChannelFutureListener() {
335 public void operationComplete(final ChannelFuture wcf)
336 throws Exception {
337 log.info("Finished write: "+wcf+ " to: "+
338 request.getMethod()+" "+
339 request.getUri());
340 }
341 });
342 }
343 else {
344 log.info("Could not connect to "+hostAndPort,
345 future.getCause());
346
347
348
349 onRelayChannelClose(inboundChannel, hostAndPort, 1,
350 true);
351 }
352 }
353 });
354 }
355
356 if (request.isChunked()) {
357 readingChunks = true;
358 }
359 }
360
361
362 public void onChannelAvailable(final String hostAndPortKey,
363 final ChannelFuture cf) {
364
365 synchronized (this.externalHostsToChannelFutures) {
366 final Queue<ChannelFuture> futures =
367 this.externalHostsToChannelFutures.get(hostAndPort);
368
369 final Queue<ChannelFuture> toUse;
370 if (futures == null) {
371 toUse = new LinkedList<ChannelFuture>();
372 this.externalHostsToChannelFutures.put(hostAndPort, toUse);
373 } else {
374 toUse = futures;
375 }
376 toUse.add(cf);
377 }
378 }
379
380 private ChannelFuture getChannelFuture() {
381 synchronized (this.externalHostsToChannelFutures) {
382 final Queue<ChannelFuture> futures =
383 this.externalHostsToChannelFutures.get(hostAndPort);
384 if (futures == null) {
385 return null;
386 }
387 if (futures.isEmpty()) {
388 return null;
389 }
390 final ChannelFuture cf = futures.remove();
391
392 if (cf != null && cf.isSuccess() &&
393 !cf.getChannel().isConnected()) {
394
395
396
397 removeProxyToWebConnection(hostAndPort);
398 return null;
399 }
400 return cf;
401 }
402 }
403
404 private void writeConnectResponse(final ChannelHandlerContext ctx,
405 final HttpRequest httpRequest, final Channel outgoingChannel) {
406 final int port = ProxyUtils.parsePort(httpRequest);
407 final Channel browserToProxyChannel = ctx.getChannel();
408
409
410
411
412 if (port < 0) {
413 log.warn("Connecting on port other than 443!!");
414 final String statusLine = "HTTP/1.1 502 Proxy Error\r\n";
415 ProxyUtils.writeResponse(browserToProxyChannel, statusLine,
416 ProxyUtils.PROXY_ERROR_HEADERS);
417 }
418 else {
419 browserToProxyChannel.setReadable(false);
420
421
422
423
424 ctx.getPipeline().remove("encoder");
425 ctx.getPipeline().remove("decoder");
426 ctx.getPipeline().remove("handler");
427
428
429
430
431 ctx.getPipeline().addLast("handler",
432 new HttpConnectRelayingHandler(outgoingChannel, this.channelGroup));
433
434 final String statusLine = "HTTP/1.1 200 Connection established\r\n";
435 ProxyUtils.writeResponse(browserToProxyChannel, statusLine,
436 ProxyUtils.CONNECT_OK_HEADERS);
437
438 browserToProxyChannel.setReadable(true);
439 }
440 }
441
442 private ChannelFuture newChannelFuture(final HttpRequest httpRequest,
443 final Channel browserToProxyChannel) {
444 final String host;
445 final int port;
446 if (hostAndPort.contains(":")) {
447 host = StringUtils.substringBefore(hostAndPort, ":");
448 final String portString =
449 StringUtils.substringAfter(hostAndPort, ":");
450 port = Integer.parseInt(portString);
451 }
452 else {
453 host = hostAndPort;
454 port = 80;
455 }
456
457
458 final ClientBootstrap cb = new ClientBootstrap(clientChannelFactory);
459
460 final ChannelPipelineFactory cpf;
461 if (httpRequest.getMethod() == HttpMethod.CONNECT) {
462
463
464
465 cpf = new ChannelPipelineFactory() {
466 public ChannelPipeline getPipeline() throws Exception {
467
468 final ChannelPipeline pipeline = pipeline();
469 pipeline.addLast("handler",
470 new HttpConnectRelayingHandler(browserToProxyChannel,
471 channelGroup));
472 return pipeline;
473 }
474 };
475 }
476 else {
477 cpf = relayPipelineFactoryFactory.getRelayPipelineFactory(
478 httpRequest, browserToProxyChannel, this);
479 }
480
481 cb.setPipelineFactory(cpf);
482 cb.setOption("connectTimeoutMillis", 40*1000);
483 log.info("Starting new connection to: {}", hostAndPort);
484 final ChannelFuture future =
485 cb.connect(new InetSocketAddress(host, port));
486 return future;
487 }
488
489 @Override
490 public void channelOpen(final ChannelHandlerContext ctx,
491 final ChannelStateEvent cse) throws Exception {
492 final Channel inboundChannel = cse.getChannel();
493 log.info("New channel opened: {}", inboundChannel);
494 totalBrowserToProxyConnections++;
495 browserToProxyConnections++;
496 log.info("Now "+totalBrowserToProxyConnections+
497 " browser to proxy channels...");
498 log.info("Now this class has "+browserToProxyConnections+
499 " browser to proxy channels...");
500
501
502 if (this.channelGroup != null) {
503 this.channelGroup.add(inboundChannel);
504 }
505 }
506
507 @Override
508 public void channelClosed(final ChannelHandlerContext ctx,
509 final ChannelStateEvent cse) {
510 log.info("Channel closed: {}", cse.getChannel());
511 totalBrowserToProxyConnections--;
512 browserToProxyConnections--;
513 log.info("Now "+totalBrowserToProxyConnections+
514 " total browser to proxy channels...");
515 log.info("Now this class has "+browserToProxyConnections+
516 " browser to proxy channels...");
517
518
519
520 if (browserToProxyConnections == 0) {
521 log.info("Closing all proxy to web channels for this browser " +
522 "to proxy connection!!!");
523 final Collection<Queue<ChannelFuture>> allFutures =
524 this.externalHostsToChannelFutures.values();
525 for (final Queue<ChannelFuture> futures : allFutures) {
526 for (final ChannelFuture future : futures) {
527 final Channel ch = future.getChannel();
528 if (ch.isOpen()) {
529 future.getChannel().close();
530 }
531 }
532 }
533 }
534 }
535
536 public void onRelayChannelClose(final Channel browserToProxyChannel,
537 final String key, final int unansweredRequestsOnChannel,
538 final boolean closedEndsResponseBody) {
539 if (closedEndsResponseBody) {
540 log.info("Close ends response body");
541 this.receivedChannelClosed = true;
542 }
543 log.info("this.receivedChannelClosed: "+this.receivedChannelClosed);
544 removeProxyToWebConnection(key);
545
546
547
548
549
550
551
552 this.unansweredRequestCount -= unansweredRequestsOnChannel;
553 if (this.receivedChannelClosed &&
554 (this.externalHostsToChannelFutures.isEmpty() || this.unansweredRequestCount == 0)) {
555 if (!browserChannelClosed.getAndSet(true)) {
556 log.info("Closing browser to proxy channel");
557 ProxyUtils.closeOnFlush(browserToProxyChannel);
558 }
559 }
560 else {
561 log.info("Not closing browser to proxy channel. Still "+
562 this.externalHostsToChannelFutures.size()+" connections and awaiting "+
563 this.unansweredRequestCount + " responses");
564 }
565 }
566
567
568 private void removeProxyToWebConnection(final String key) {
569
570 this.externalHostsToChannelFutures.remove(key);
571 }
572
573 public void onRelayHttpResponse(final Channel browserToProxyChannel,
574 final String key, final HttpRequest httpRequest) {
575 if (this.useJmx) {
576 this.answeredRequests.add(httpRequest.toString());
577 this.unansweredRequests.remove(httpRequest.toString());
578 }
579 this.unansweredRequestCount--;
580 this.responsesReceived++;
581
582
583
584 if (this.unansweredRequestCount == 0 && this.receivedChannelClosed) {
585 if (!browserChannelClosed.getAndSet(true)) {
586 log.info("Closing browser to proxy channel on HTTP response");
587 ProxyUtils.closeOnFlush(browserToProxyChannel);
588 }
589 }
590 else {
591 log.info("Not closing browser to proxy channel. Still "+
592 "awaiting " + this.unansweredRequestCount+" responses..." +
593 "receivedChannelClosed="+this.receivedChannelClosed);
594 }
595 }
596
597 @Override
598 public void exceptionCaught(final ChannelHandlerContext ctx,
599 final ExceptionEvent e) throws Exception {
600 final Channel channel = e.getChannel();
601 final Throwable cause = e.getCause();
602 if (cause instanceof ClosedChannelException) {
603 log.warn("Caught an exception on browser to proxy channel: "+
604 channel, cause);
605 }
606 else {
607 log.info("Caught an exception on browser to proxy channel: "+
608 channel, cause);
609 }
610 ProxyUtils.closeOnFlush(channel);
611 }
612
613 public int getClientConnections() {
614 return this.browserToProxyConnections;
615 }
616
617 public int getTotalClientConnections() {
618 return totalBrowserToProxyConnections;
619 }
620
621 public int getOutgoingConnections() {
622 return this.externalHostsToChannelFutures.size();
623 }
624
625 public int getRequestsSent() {
626 return this.requestsSent;
627 }
628
629 public int getResponsesReceived() {
630 return this.responsesReceived;
631 }
632
633 public String getUnansweredRequests() {
634 return this.unansweredRequests.toString();
635 }
636
637 public String getAnsweredReqeusts() {
638 return this.answeredRequests.toString();
639 }
640
641 public String getRequests() {
642 final StringBuilder sb = new StringBuilder();
643 for (final HttpRequest hr : requests) {
644 final String uri = hr.getUri();
645 sb.append(uri);
646 sb.append("\n");
647 }
648 return sb.toString();
649 }
650
651 }