View Javadoc

1   package org.littleshoot.proxy;
2   
3   import static org.jboss.netty.channel.Channels.pipeline;
4   
5   import java.lang.management.ManagementFactory;
6   import java.net.InetSocketAddress;
7   import java.nio.channels.ClosedChannelException;
8   import java.util.Collection;
9   import java.util.HashSet;
10  import java.util.LinkedList;
11  import java.util.Map;
12  import java.util.Queue;
13  import java.util.Set;
14  import java.util.concurrent.ConcurrentHashMap;
15  import java.util.concurrent.atomic.AtomicBoolean;
16  
17  import javax.management.InstanceAlreadyExistsException;
18  import javax.management.MBeanRegistrationException;
19  import javax.management.MBeanServer;
20  import javax.management.MalformedObjectNameException;
21  import javax.management.NotCompliantMBeanException;
22  import javax.management.ObjectName;
23  
24  import org.apache.commons.lang.StringUtils;
25  import org.jboss.netty.bootstrap.ClientBootstrap;
26  import org.jboss.netty.channel.Channel;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelPipeline;
31  import org.jboss.netty.channel.ChannelPipelineFactory;
32  import org.jboss.netty.channel.ChannelStateEvent;
33  import org.jboss.netty.channel.ExceptionEvent;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
36  import org.jboss.netty.channel.group.ChannelGroup;
37  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
38  import org.jboss.netty.handler.codec.http.HttpChunk;
39  import org.jboss.netty.handler.codec.http.HttpMethod;
40  import org.jboss.netty.handler.codec.http.HttpRequest;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  /***
45   * Class for handling all HTTP requests from the browser to the proxy.
46   * 
47   * Note this class only ever handles a single connection from the browser.
48   * The browser can and will, however, send requests to multiple hosts using
49   * that same connection, i.e. it will send a request to host B once a request
50   * to host A has completed.
51   */
52  public class HttpRequestHandler extends SimpleChannelUpstreamHandler 
53      implements RelayListener, ConnectionData {
54  
55      private final static Logger log = 
56          LoggerFactory.getLogger(HttpRequestHandler.class);
57      private volatile boolean readingChunks;
58      
59      private static volatile int totalBrowserToProxyConnections = 0;
60      private volatile int browserToProxyConnections = 0;
61      
62      private final Map<String, Queue<ChannelFuture>> externalHostsToChannelFutures = 
63          new ConcurrentHashMap<String, Queue<ChannelFuture>>();
64      
65      private volatile int messagesReceived = 0;
66      
67      private volatile int unansweredRequestCount = 0;
68      
69      private volatile int requestsSent = 0;
70      
71      private volatile int responsesReceived = 0;
72      
73      private final ProxyAuthorizationManager authorizationManager;
74      
75      private final Set<String> answeredRequests = new HashSet<String>();
76      private final Set<String> unansweredRequests = new HashSet<String>();
77  
78      private ChannelFuture currentChannelFuture;
79      
80      /***
81       * This is just for debugging.
82       */
83      private final Queue<HttpRequest> requests = 
84          new LinkedList<HttpRequest>();
85      
86      
87      /***
88       * Note, we *can* receive requests for multiple different sites from the
89       * same connection from the browser, so the host and port most certainly
90       * does change.
91       * 
92       * Why do we need to store it? We need it to lookup the appropriate 
93       * external connection to send HTTP chunks to.
94       */
95      private String hostAndPort;
96      private final String chainProxyHostAndPort;
97      private final ChannelGroup channelGroup;
98  
99      private final ClientSocketChannelFactory clientChannelFactory;
100     private final ProxyCacheManager cacheManager;
101     
102     private final AtomicBoolean browserChannelClosed = new AtomicBoolean(false);
103     private volatile boolean receivedChannelClosed = false;
104     private final boolean useJmx;
105     
106     private final RelayPipelineFactoryFactory relayPipelineFactoryFactory;
107     
108     /***
109      * Creates a new class for handling HTTP requests with no frills.
110      * 
111      * @param clientChannelFactory The common channel factory for clients.
112      */
113     public HttpRequestHandler(
114         final ClientSocketChannelFactory clientChannelFactory,
115         final RelayPipelineFactoryFactory relayPipelineFactoryFactory) {
116         this(null, null, null, clientChannelFactory, null, 
117             relayPipelineFactoryFactory, false);
118     }
119     
120     /***
121      * Creates a new class for handling HTTP requests with the specified
122      * authentication manager.
123      * 
124      * @param cacheManager The manager for the cache. 
125      * @param authorizationManager The class that handles any 
126      * proxy authentication requirements.
127      * @param channelGroup The group of channels for keeping track of all
128      * channels we've opened.
129      * @param filters HTTP filtering rules.
130      * @param clientChannelFactory The common channel factory for clients.
131      */
132     public HttpRequestHandler(final ProxyCacheManager cacheManager, 
133         final ProxyAuthorizationManager authorizationManager, 
134         final ChannelGroup channelGroup, 
135         final ClientSocketChannelFactory clientChannelFactory,
136         final RelayPipelineFactoryFactory relayPipelineFactoryFactory) {
137         this(cacheManager, authorizationManager, channelGroup,
138             clientChannelFactory, null, relayPipelineFactoryFactory, false);
139     }
140     
141     /***
142      * Creates a new class for handling HTTP requests with the specified
143      * authentication manager.
144      * 
145      * @param cacheManager The manager for the cache. 
146      * @param authorizationManager The class that handles any 
147      * proxy authentication requirements.
148      * @param channelGroup The group of channels for keeping track of all
149      * channels we've opened.
150      * @param filters HTTP filtering rules.
151      * @param clientChannelFactory The common channel factory for clients.
152      * @param chainProxyHostAndPort upstream proxy server host and port or null 
153      * if none used.
154      * @param requestFilter An optional filter for HTTP requests.
155      * @param useJmx Whether or not to expose debugging properties via JMX.
156      */
157     public HttpRequestHandler(final ProxyCacheManager cacheManager, 
158         final ProxyAuthorizationManager authorizationManager, 
159         final ChannelGroup channelGroup, 
160         final ClientSocketChannelFactory clientChannelFactory,
161         final String chainProxyHostAndPort, 
162         final RelayPipelineFactoryFactory relayPipelineFactoryFactory,
163         final boolean useJmx) {
164         this.cacheManager = cacheManager;
165         this.authorizationManager = authorizationManager;
166         this.channelGroup = channelGroup;
167         this.clientChannelFactory = clientChannelFactory;
168         this.chainProxyHostAndPort = chainProxyHostAndPort;
169         this.relayPipelineFactoryFactory = relayPipelineFactoryFactory;
170         this.useJmx = useJmx;
171         if (useJmx) {
172             setupJmx();
173         }
174     }
175 
176 
177     private void setupJmx() {
178         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
179         try {
180             final Class<? extends SimpleChannelUpstreamHandler> clazz = 
181                 getClass();
182             final String pack = clazz.getPackage().getName();
183             final String oName =
184                 pack+":type="+clazz.getSimpleName()+"-"+clazz.getSimpleName() + 
185                 "-"+hashCode();
186             log.info("Registering MBean with name: {}", oName);
187             final ObjectName mxBeanName = new ObjectName(oName);
188             if(!mbs.isRegistered(mxBeanName)) {
189                 mbs.registerMBean(this, mxBeanName);
190             }
191         } catch (final MalformedObjectNameException e) {
192             log.error("Could not set up JMX", e);
193         } catch (final InstanceAlreadyExistsException e) {
194             log.error("Could not set up JMX", e);
195         } catch (final MBeanRegistrationException e) {
196             log.error("Could not set up JMX", e);
197         } catch (final NotCompliantMBeanException e) {
198             log.error("Could not set up JMX", e);
199         }
200     }
201     
202     @Override
203     public void messageReceived(final ChannelHandlerContext ctx, 
204         final MessageEvent me) {
205         if (browserChannelClosed.get()) {
206             log.info("Ignoring message since the connection to the browser " +
207                 "is about to close");
208             return;
209         }
210         messagesReceived++;
211         log.info("Received "+messagesReceived+" total messages");
212         if (!readingChunks) {
213             processMessage(ctx, me);
214         } 
215         else {
216             processChunk(ctx, me);
217         }
218     }
219 
220     private void processChunk(final ChannelHandlerContext ctx, 
221         final MessageEvent me) {
222         log.info("Processing chunk...");
223         final HttpChunk chunk = (HttpChunk) me.getMessage();
224         
225         // Remember this will typically be a persistent connection, so we'll
226         // get another request after we're read the last chunk. So we need to
227         // reset it back to no longer read in chunk mode.
228         if (chunk.isLast()) {
229             this.readingChunks = false;
230         }
231         
232         // We don't necessarily know the channel is connected yet!! This can
233         // happen if the client sends a chunk directly after the initial 
234         // request.
235         if (this.currentChannelFuture.getChannel().isConnected()) {
236             this.currentChannelFuture.getChannel().write(chunk);
237         }
238         else {
239             this.currentChannelFuture.addListener(new ChannelFutureListener() {
240                 
241                 public void operationComplete(final ChannelFuture future) 
242                     throws Exception {
243                     currentChannelFuture.getChannel().write(chunk);
244                 }
245             });
246         }
247     }
248     
249     private void processMessage(final ChannelHandlerContext ctx, 
250         final MessageEvent me) {
251         
252         final HttpRequest request = (HttpRequest) me.getMessage();
253         //requests.add(request);
254         
255         final Channel inboundChannel = me.getChannel();
256         if (this.cacheManager != null &&
257             this.cacheManager.returnCacheHit((HttpRequest)me.getMessage(), 
258             inboundChannel)) {
259             log.info("Found cache hit! Cache wrote the response.");
260             return;
261         }
262         this.unansweredRequestCount++;
263         
264         log.info("Got request: {} on channel: "+inboundChannel, request);
265         if (this.authorizationManager != null && 
266             !this.authorizationManager.handleProxyAuthorization(request, ctx)) {
267             log.info("Not authorized!!");
268             return;
269         }
270         
271         if (this.chainProxyHostAndPort != null) {
272             this.hostAndPort = this.chainProxyHostAndPort;
273         } else {
274             this.hostAndPort = ProxyUtils.parseHostAndPort(request);
275         }
276         
277         final class OnConnect {
278             public ChannelFuture onConnect(final ChannelFuture cf) {
279                 if (request.getMethod() != HttpMethod.CONNECT) {
280                     final ChannelFuture writeFuture = cf.getChannel().write(request);
281                     writeFuture.addListener(new ChannelFutureListener() {
282                         
283                         public void operationComplete(final ChannelFuture future) 
284                             throws Exception {
285                             if (useJmx) {
286                                 unansweredRequests.add(request.toString());
287                             }
288                             requestsSent++;
289                         }
290                     });
291                     return writeFuture;
292                 }
293                 else {
294                     writeConnectResponse(ctx, request, cf.getChannel());
295                     return cf;
296                 }
297             }
298         }
299      
300         final OnConnect onConnect = new OnConnect();
301         
302         final ChannelFuture curFuture = getChannelFuture();
303         if (curFuture != null) {
304             log.info("Using existing connection...");
305             this.currentChannelFuture = curFuture;
306             if (curFuture.getChannel().isConnected()) {
307                 onConnect.onConnect(curFuture);
308             }
309             else {
310                 final ChannelFutureListener cfl = new ChannelFutureListener() {
311                     public void operationComplete(final ChannelFuture future)
312                         throws Exception {
313                         onConnect.onConnect(curFuture);
314                     }
315                 };
316                 curFuture.addListener(cfl);
317             }
318         }
319         else {
320             log.info("Establishing new connection");
321             final ChannelFuture cf = 
322                 newChannelFuture(request, inboundChannel);
323             cf.addListener(new ChannelFutureListener() {
324                 public void operationComplete(final ChannelFuture future)
325                     throws Exception {
326                     final Channel channel = future.getChannel();
327                     if (channelGroup != null) {
328                         channelGroup.add(channel);
329                     }
330                     if (future.isSuccess()) {
331                         log.info("Connected successfully to: {}", channel);
332                         log.info("Writing message on channel...");
333                         final ChannelFuture wf = onConnect.onConnect(cf);
334                         wf.addListener(new ChannelFutureListener() {
335                             public void operationComplete(final ChannelFuture wcf)
336                                 throws Exception {
337                                 log.info("Finished write: "+wcf+ " to: "+
338                                     request.getMethod()+" "+
339                                     request.getUri());
340                             }
341                         });
342                     }
343                     else {
344                         log.info("Could not connect to "+hostAndPort, 
345                             future.getCause());
346                         
347                         // We call the relay channel closed event handler
348                         // with one associated unanswered request.
349                         onRelayChannelClose(inboundChannel, hostAndPort, 1, 
350                             true);
351                     }
352                 }
353             });
354         }
355             
356         if (request.isChunked()) {
357             readingChunks = true;
358         }
359     }
360     
361     
362     public void onChannelAvailable(final String hostAndPortKey, 
363         final ChannelFuture cf) {
364         
365         synchronized (this.externalHostsToChannelFutures) {
366             final Queue<ChannelFuture> futures = 
367                 this.externalHostsToChannelFutures.get(hostAndPort);
368             
369             final Queue<ChannelFuture> toUse;
370             if (futures == null) {
371                 toUse = new LinkedList<ChannelFuture>();
372                 this.externalHostsToChannelFutures.put(hostAndPort, toUse);
373             } else {
374                 toUse = futures;
375             }
376             toUse.add(cf);
377         }
378     }
379 
380     private ChannelFuture getChannelFuture() {
381         synchronized (this.externalHostsToChannelFutures) {
382             final Queue<ChannelFuture> futures = 
383                 this.externalHostsToChannelFutures.get(hostAndPort);
384             if (futures == null) {
385                 return null;
386             }
387             if (futures.isEmpty()) {
388                 return null;
389             }
390             final ChannelFuture cf = futures.remove();
391 
392             if (cf != null && cf.isSuccess() && 
393                 !cf.getChannel().isConnected()) {
394                 // In this case, the future successfully connected at one
395                 // time, but we're no longer connected. We need to remove the
396                 // channel and open a new one.
397                 removeProxyToWebConnection(hostAndPort);
398                 return null;
399             }
400             return cf;
401         }
402     }
403 
404     private void writeConnectResponse(final ChannelHandlerContext ctx, 
405         final HttpRequest httpRequest, final Channel outgoingChannel) {
406         final int port = ProxyUtils.parsePort(httpRequest);
407         final Channel browserToProxyChannel = ctx.getChannel();
408         
409         // TODO: We should really only allow access on 443, but this breaks
410         // what a lot of browsers do in practice.
411         //if (port != 443) {
412         if (port < 0) {
413             log.warn("Connecting on port other than 443!!");
414             final String statusLine = "HTTP/1.1 502 Proxy Error\r\n";
415             ProxyUtils.writeResponse(browserToProxyChannel, statusLine, 
416                 ProxyUtils.PROXY_ERROR_HEADERS);
417         }
418         else {
419             browserToProxyChannel.setReadable(false);
420             
421             // We need to modify both the pipeline encoders and decoders for the
422             // browser to proxy channel *and* the encoders and decoders for the
423             // proxy to external site channel.
424             ctx.getPipeline().remove("encoder");
425             ctx.getPipeline().remove("decoder");
426             ctx.getPipeline().remove("handler");
427             
428             // Note there are two HttpConnectRelayingHandler for each HTTP
429             // CONNECT tunnel -- one writing to the browser, and one writing
430             // to the remote host.
431             ctx.getPipeline().addLast("handler", 
432                 new HttpConnectRelayingHandler(outgoingChannel, this.channelGroup));
433             
434             final String statusLine = "HTTP/1.1 200 Connection established\r\n";
435             ProxyUtils.writeResponse(browserToProxyChannel, statusLine, 
436                 ProxyUtils.CONNECT_OK_HEADERS);
437             
438             browserToProxyChannel.setReadable(true);
439         }
440     }
441 
442     private ChannelFuture newChannelFuture(final HttpRequest httpRequest, 
443         final Channel browserToProxyChannel) {
444         final String host;
445         final int port;
446         if (hostAndPort.contains(":")) {
447             host = StringUtils.substringBefore(hostAndPort, ":");
448             final String portString = 
449                 StringUtils.substringAfter(hostAndPort, ":");
450             port = Integer.parseInt(portString);
451         }
452         else {
453             host = hostAndPort;
454             port = 80;
455         }
456         
457         // Configure the client.
458         final ClientBootstrap cb = new ClientBootstrap(clientChannelFactory);
459         
460         final ChannelPipelineFactory cpf;
461         if (httpRequest.getMethod() == HttpMethod.CONNECT) {
462             // In the case of CONNECT, we just want to relay all data in both 
463             // directions. We SHOULD make sure this is traffic on a reasonable
464             // port, however, such as 80 or 443, to reduce security risks.
465             cpf = new ChannelPipelineFactory() {
466                 public ChannelPipeline getPipeline() throws Exception {
467                     // Create a default pipeline implementation.
468                     final ChannelPipeline pipeline = pipeline();
469                     pipeline.addLast("handler", 
470                         new HttpConnectRelayingHandler(browserToProxyChannel,
471                             channelGroup));
472                     return pipeline;
473                 }
474             };
475         }
476         else {
477             cpf = relayPipelineFactoryFactory.getRelayPipelineFactory(
478                 httpRequest, browserToProxyChannel, this);
479         }
480             
481         cb.setPipelineFactory(cpf);
482         cb.setOption("connectTimeoutMillis", 40*1000);
483         log.info("Starting new connection to: {}", hostAndPort);
484         final ChannelFuture future = 
485             cb.connect(new InetSocketAddress(host, port));
486         return future;
487     }
488     
489     @Override
490     public void channelOpen(final ChannelHandlerContext ctx, 
491         final ChannelStateEvent cse) throws Exception {
492         final Channel inboundChannel = cse.getChannel();
493         log.info("New channel opened: {}", inboundChannel);
494         totalBrowserToProxyConnections++;
495         browserToProxyConnections++;
496         log.info("Now "+totalBrowserToProxyConnections+
497             " browser to proxy channels...");
498         log.info("Now this class has "+browserToProxyConnections+
499             " browser to proxy channels...");
500         
501         // We need to keep track of the channel so we can close it at the end.
502         if (this.channelGroup != null) {
503             this.channelGroup.add(inboundChannel);
504         }
505     }
506     
507     @Override
508     public void channelClosed(final ChannelHandlerContext ctx, 
509         final ChannelStateEvent cse) {
510         log.info("Channel closed: {}", cse.getChannel());
511         totalBrowserToProxyConnections--;
512         browserToProxyConnections--;
513         log.info("Now "+totalBrowserToProxyConnections+
514             " total browser to proxy channels...");
515         log.info("Now this class has "+browserToProxyConnections+
516             " browser to proxy channels...");
517         
518         // The following should always be the case with
519         // @ChannelPipelineCoverage("one")
520         if (browserToProxyConnections == 0) {
521             log.info("Closing all proxy to web channels for this browser " +
522                 "to proxy connection!!!");
523             final Collection<Queue<ChannelFuture>> allFutures = 
524                 this.externalHostsToChannelFutures.values();
525             for (final Queue<ChannelFuture> futures : allFutures) {
526                 for (final ChannelFuture future : futures) {
527                     final Channel ch = future.getChannel();
528                     if (ch.isOpen()) {
529                         future.getChannel().close();
530                     }
531                 }
532             }
533         }
534     }
535     
536     public void onRelayChannelClose(final Channel browserToProxyChannel, 
537         final String key, final int unansweredRequestsOnChannel,
538         final boolean closedEndsResponseBody) {
539         if (closedEndsResponseBody) {
540             log.info("Close ends response body");
541             this.receivedChannelClosed = true;
542         }
543         log.info("this.receivedChannelClosed: "+this.receivedChannelClosed);
544         removeProxyToWebConnection(key);
545         
546         // The closed channel may have had outstanding requests we haven't 
547         // properly accounted for. The channel closing effectively marks those
548         // requests as "answered" when the responses didn't contain any other
549         // markers for complete responses, such as Content-Length or the the
550         // last chunk of a chunked encoding. All of this potentially results 
551         // in the closing of the client/browser connection here.
552         this.unansweredRequestCount -= unansweredRequestsOnChannel;
553         if (this.receivedChannelClosed && 
554             (this.externalHostsToChannelFutures.isEmpty() || this.unansweredRequestCount == 0)) {
555             if (!browserChannelClosed.getAndSet(true)) {
556                 log.info("Closing browser to proxy channel");
557                 ProxyUtils.closeOnFlush(browserToProxyChannel);
558             }
559         }
560         else {
561             log.info("Not closing browser to proxy channel. Still "+
562                 this.externalHostsToChannelFutures.size()+" connections and awaiting "+
563                 this.unansweredRequestCount + " responses");
564         }
565     }
566     
567 
568     private void removeProxyToWebConnection(final String key) {
569         // It's probably already been removed at this point, but just in case.
570         this.externalHostsToChannelFutures.remove(key);
571     }
572 
573     public void onRelayHttpResponse(final Channel browserToProxyChannel,
574         final String key, final HttpRequest httpRequest) {
575         if (this.useJmx) {
576             this.answeredRequests.add(httpRequest.toString());
577             this.unansweredRequests.remove(httpRequest.toString());
578         }
579         this.unansweredRequestCount--;
580         this.responsesReceived++;
581         // If we've received responses to all outstanding requests and one
582         // of those outgoing channels has been closed, we should close the
583         // connection to the browser.
584         if (this.unansweredRequestCount == 0 && this.receivedChannelClosed) {
585             if (!browserChannelClosed.getAndSet(true)) {
586                 log.info("Closing browser to proxy channel on HTTP response");
587                 ProxyUtils.closeOnFlush(browserToProxyChannel);
588             }
589         }
590         else {
591             log.info("Not closing browser to proxy channel. Still "+
592                 "awaiting " + this.unansweredRequestCount+" responses..." +
593                 "receivedChannelClosed="+this.receivedChannelClosed);
594         }
595     }
596     
597     @Override
598     public void exceptionCaught(final ChannelHandlerContext ctx, 
599         final ExceptionEvent e) throws Exception {
600         final Channel channel = e.getChannel();
601         final Throwable cause = e.getCause();
602         if (cause instanceof ClosedChannelException) {
603             log.warn("Caught an exception on browser to proxy channel: "+
604                 channel, cause);
605         }
606         else {
607             log.info("Caught an exception on browser to proxy channel: "+
608                 channel, cause);
609         }
610         ProxyUtils.closeOnFlush(channel);
611     }
612 
613     public int getClientConnections() {
614         return this.browserToProxyConnections;
615     }
616     
617     public int getTotalClientConnections() {
618         return totalBrowserToProxyConnections;
619     }
620 
621     public int getOutgoingConnections() {
622         return this.externalHostsToChannelFutures.size();
623     }
624 
625     public int getRequestsSent() {
626         return this.requestsSent;
627     }
628 
629     public int getResponsesReceived() {
630         return this.responsesReceived;
631     }
632 
633     public String getUnansweredRequests() {
634         return this.unansweredRequests.toString();
635     }
636 
637     public String getAnsweredReqeusts() {
638         return this.answeredRequests.toString();
639     }
640 
641     public String getRequests() {
642         final StringBuilder sb = new StringBuilder();
643         for (final HttpRequest hr : requests) {
644             final String uri = hr.getUri();
645             sb.append(uri);
646             sb.append("\n");
647         }
648         return sb.toString();
649     }
650 
651 }