internalfunenqueue(call:AsyncCall){synchronized(this){//添加到准备请求的队列中readyAsyncCalls.add(call)// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to// the same host.if(!call.call.forWebSocket){//寻找相同主机的CallvalexistingCall=findExistingCallWithHost(call.host)//所有的AsyncCall公用一个对象if(existingCall!=null)call.reuseCallsPerHostFrom(existingCall)}}promoteAndExecute()}
privatefunpromoteAndExecute():Boolean{this.assertThreadDoesntHoldLock()valexecutableCalls=mutableListOf<AsyncCall>()valisRunning:Booleansynchronized(this){vali=readyAsyncCalls.iterator()while(i.hasNext()){valasyncCall=i.next()//如果正在运行的Call的数量大于64if(runningAsyncCalls.size>=this.maxRequests)break// Max capacity. //如果每个域名的请求数超过5if(asyncCall.callsPerHost.get()>=this.maxRequestsPerHost)continue// Host max capacity.//从准备数组中移除i.remove()//个数累加asyncCall.callsPerHost.incrementAndGet()executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning=runningCallsCount()>0}//for(iin0untilexecutableCalls.size){valasyncCall=executableCalls[i]asyncCall.executeOn(executorService)}returnisRunning}
funexecuteOn(executorService:ExecutorService){client.dispatcher.assertThreadDoesntHoldLock()varsuccess=falsetry{executorService.execute(this)success=true}catch(e:RejectedExecutionException){valioException=InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)responseCallback.onFailure(this@RealCall,ioException)}finally{if(!success){client.dispatcher.finished(this)// This call is no longer running!}}}
overridefunrun(){threadName("OkHttp ${redactedUrl()}"){varsignalledCallback=falsetimeout.enter()try{valresponse=getResponseWithInterceptorChain()signalledCallback=trueresponseCallback.onResponse(this@RealCall,response)}catch(e:IOException){if(signalledCallback){// Do not signal the callback twice!Platform.get().log("Callback failure for ${toLoggableString()}",Platform.INFO,e)}else{responseCallback.onFailure(this@RealCall,e)}}catch(t:Throwable){cancel()if(!signalledCallback){valcanceledException=IOException("canceled due to $t")canceledException.addSuppressed(t)responseCallback.onFailure(this@RealCall,canceledException)}throwt}finally{client.dispatcher.finished(this)}}}
@Throws(IOException::class)internalfungetResponseWithInterceptorChain():Response{// Build a full stack of interceptors.valinterceptors=mutableListOf<Interceptor>()interceptors+=client.interceptorsinterceptors+=RetryAndFollowUpInterceptor(client)interceptors+=BridgeInterceptor(client.cookieJar)interceptors+=CacheInterceptor(client.cache)interceptors+=ConnectInterceptorif(!forWebSocket){interceptors+=client.networkInterceptors}interceptors+=CallServerInterceptor(forWebSocket)//创建chain并把所有的拦截器传递给chainvalchain=RealInterceptorChain(call=this,interceptors=interceptors,index=0,exchange=null,request=originalRequest,connectTimeoutMillis=client.connectTimeoutMillis,readTimeoutMillis=client.readTimeoutMillis,writeTimeoutMillis=client.writeTimeoutMillis)varcalledNoMoreExchanges=falsetry{//执行chain的proceed方法valresponse=chain.proceed(originalRequest)if(isCanceled()){response.closeQuietly()throwIOException("Canceled")}returnresponse}catch(e:IOException){calledNoMoreExchanges=truethrownoMoreExchanges(e)asThrowable}finally{if(!calledNoMoreExchanges){noMoreExchanges(null)}}}
@Throws(IOException::class)overridefunproceed(request:Request):Response{check(index<interceptors.size)calls++if(exchange!=null){check(exchange.finder.sameHostAndPort(request.url)){"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls==1){"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.//调用copy方法创建一个chainvalnext=copy(index=index+1,request=request)//获取下一个拦截器valinterceptor=interceptors[index]@Suppress("USELESS_ELVIS")//调用拦截器的intercept方法 在interceptor中会调用next的proceed执行下一个拦截器valresponse=interceptor.intercept(next)?:throwNullPointerException("interceptor $interceptor returned null")if(exchange!=null){check(index+1>=interceptors.size||next.calls==1){"network interceptor $interceptor must call proceed() exactly once"}}check(response.body!=null){"interceptor $interceptor returned a response with no body"}returnresponse}
//返回一个CacheRequest对象internalvalcache=DiskLruCache(fileSystem=fileSystem,directory=directory,appVersion=VERSION,valueCount=ENTRY_COUNT,maxSize=maxSize,taskRunner=TaskRunner.INSTANCE)internalfunput(response:Response):CacheRequest?{valrequestMethod=response.request.method//获取请求方法if(HttpMethod.invalidatesCache(response.request.method)){try{remove(response.request)}catch(_:IOException){// The cache cannot be written.}returnnull}//如果不是GET 请求直接返回null if(requestMethod!="GET"){// Don't cache non-GET responses. We're technically allowed to cache HEAD requests and some// POST requests, but the complexity of doing so is high and the benefit is low.returnnull}//Vary头的值为*的情况if(response.hasVaryAll()){returnnull}valentry=Entry(response)//创建Entry对象vareditor:DiskLruCache.Editor?=nulltry{//先调用key方法生成key//调用DiskLruCache的edit方法获取Editoreditor=cache.edit(key(response.request.url))?:returnnullentry.writeTo(editor)//写入本地returnRealCacheRequest(editor)}catch(_:IOException){abortQuietly(editor)returnnull}}
internalfunget(request:Request):Response?{valkey=key(request.url)valsnapshot:DiskLruCache.Snapshot=try{cache[key]?:returnnull}catch(_:IOException){returnnull// Give up because the cache cannot be read.}valentry:Entry=try{Entry(snapshot.getSource(ENTRY_METADATA))}catch(_:IOException){snapshot.closeQuietly()returnnull}valresponse=entry.response(snapshot)if(!entry.matches(request,response)){response.body?.closeQuietly()returnnull}returnresponse}
if(cache!=null){//有body 并且可以被缓存if(response.promisesBody()&&CacheStrategy.isCacheable(response,networkRequest)){// Offer this request to the cache.valcacheRequest=cache.put(response)returncacheWritingResponse(cacheRequest,response).also{if(cacheResponse!=null){// This will log a conditional cache miss only.listener.cacheMiss(call)}}}if(HttpMethod.invalidatesCache(networkRequest.method)){try{cache.remove(networkRequest)}catch(_:IOException){// The cache cannot be written.}}}
funisCacheable(response:Response,request:Request):Boolean{// Always go to network for uncacheable response codes (RFC 7231 section 6.1), This// implementation doesn't support caching partial content.when(response.code){HTTP_OK,HTTP_NOT_AUTHORITATIVE,HTTP_NO_CONTENT,HTTP_MULT_CHOICE,HTTP_MOVED_PERM,HTTP_NOT_FOUND,HTTP_BAD_METHOD,HTTP_GONE,HTTP_REQ_TOO_LONG,HTTP_NOT_IMPLEMENTED,StatusLine.HTTP_PERM_REDIRECT->{// These codes can be cached unless headers forbid it.}HTTP_MOVED_TEMP,StatusLine.HTTP_TEMP_REDIRECT->{// These codes can only be cached with the right response headers.// http://tools.ietf.org/html/rfc7234#section-3// s-maxage is not checked because OkHttp is a private cache that should ignore s-maxage.if(response.header("Expires")==null&&response.cacheControl.maxAgeSeconds==-1&&!response.cacheControl.isPublic&&!response.cacheControl.isPrivate){returnfalse}}else->{// All other codes cannot be cached.returnfalse}}// A 'no-store' directive on request or response prevents the response from being cached.return!response.cacheControl.noStore&&!request.cacheControl.noStore}
/** Finds a new or pooled connection to carry a forthcoming request and response. */internalfuninitExchange(chain:RealInterceptorChain):Exchange{synchronized(this){check(expectMoreExchanges){"released"}check(!responseBodyOpen)check(!requestBodyOpen)}//ExchangeFindervalexchangeFinder=this.exchangeFinder!!//ExchangeFinder调用find方法获取ExchangeCodecvalcodec=exchangeFinder.find(client,chain)//创建Exchangevalresult=Exchange(this,eventListener,exchangeFinder,codec)this.interceptorScopedExchange=resultthis.exchange=resultsynchronized(this){this.requestBodyOpen=truethis.responseBodyOpen=true}if(canceled)throwIOException("Canceled")returnresult}
@Throws(IOException::class)privatefunfindHealthyConnection(connectTimeout:Int,readTimeout:Int,writeTimeout:Int,pingIntervalMillis:Int,connectionRetryEnabled:Boolean,doExtensiveHealthChecks:Boolean):RealConnection{while(true){//调用findConnection方法获取RealConnectionvalcandidate=findConnection(connectTimeout=connectTimeout,readTimeout=readTimeout,writeTimeout=writeTimeout,pingIntervalMillis=pingIntervalMillis,connectionRetryEnabled=connectionRetryEnabled)// Confirm that the connection is good.//判断连接是否健康if(candidate.isHealthy(doExtensiveHealthChecks)){returncandidate}// If it isn't, take it out of the pool.candidate.noNewExchanges()// Make sure we have some routes left to try. One example where we may exhaust all the routes// would happen if we made a new connection and it immediately is detected as unhealthy.if(nextRouteToTry!=null)continuevalroutesLeft=routeSelection?.hasNext()?:trueif(routesLeft)continuevalroutesSelectionLeft=routeSelector?.hasNext()?:trueif(routesSelectionLeft)continuethrowIOException("exhausted all routes")}}
@Throws(IOException::class)privatefunfindConnection(connectTimeout:Int,readTimeout:Int,writeTimeout:Int,pingIntervalMillis:Int,connectionRetryEnabled:Boolean):RealConnection{//如果取消抛异常if(call.isCanceled())throwIOException("Canceled")// Attempt to reuse the connection from the call.//复用call里面的连接valcallConnection=call.connection// This may be mutated by releaseConnectionNoEvents()!if(callConnection!=null){vartoClose:Socket? =nullsynchronized(callConnection){if(callConnection.noNewExchanges||!sameHostAndPort(callConnection.route().address.url)){//释放连接toClose=call.releaseConnectionNoEvents()}}// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here// because we already acquired it.//如果call里面的connection不为空if(call.connection!=null){check(toClose==null)returncallConnection}// The call's connection was released.toClose?.closeQuietly()eventListener.connectionReleased(call,callConnection)}// We need a new connection. Give it fresh stats.refusedStreamCount=0connectionShutdownCount=0otherFailureCount=0// Attempt to get a connection from the pool.//尝试从连接池中获取一个连接if(connectionPool.callAcquirePooledConnection(address,call,null,false)){valresult=call.connection!!eventListener.connectionAcquired(call,result)returnresult}// Nothing in the pool. Figure out what route we'll try next.//没看懂这一块路由相关的逻辑valroutes:List<Route>?valroute:Routeif(nextRouteToTry!=null){// Use a route from a preceding coalesced connection.routes=nullroute=nextRouteToTry!!nextRouteToTry=null}elseif(routeSelection!=null&&routeSelection!!.hasNext()){// Use a route from an existing route selection.routes=nullroute=routeSelection!!.next()}else{// Compute a new route selection. This is a blocking operation!varlocalRouteSelector=routeSelectorif(localRouteSelector==null){localRouteSelector=RouteSelector(address,call.client.routeDatabase,call,eventListener)this.routeSelector=localRouteSelector}vallocalRouteSelection=localRouteSelector.next()routeSelection=localRouteSelectionroutes=localRouteSelection.routesif(call.isCanceled())throwIOException("Canceled")// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. We have a better chance of matching thanks to connection coalescing.//再次尝试从连接池中获取if(connectionPool.callAcquirePooledConnection(address,call,routes,false)){valresult=call.connection!!eventListener.connectionAcquired(call,result)returnresult}route=localRouteSelection.next()}// Connect. Tell the call about the connecting call so async cancels work.//创建连接valnewConnection=RealConnection(connectionPool,route)call.connectionToCancel=newConnectiontry{//建立连接newConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)}finally{call.connectionToCancel=null}call.client.routeDatabase.connected(newConnection.route())// If we raced another call connecting to this host, coalesce the connections. This makes for 3// different lookups in the connection pool!if(connectionPool.callAcquirePooledConnection(address,call,routes,true)){valresult=call.connection!!nextRouteToTry=routenewConnection.socket().closeQuietly()eventListener.connectionAcquired(call,result)returnresult}synchronized(newConnection){//存储到连接池中connectionPool.put(newConnection)//将connection赋值给callcall.acquireConnectionNoEvents(newConnection)}eventListener.connectionAcquired(call,newConnection)returnnewConnection}
classConnectionPoolinternalconstructor(internalvaldelegate:RealConnectionPool){constructor(maxIdleConnections:Int,keepAliveDuration:Long,timeUnit:TimeUnit):this(RealConnectionPool(//创建RealConnectionPooltaskRunner=TaskRunner.INSTANCE,//获取TaskRunnermaxIdleConnections=maxIdleConnections,keepAliveDuration=keepAliveDuration,timeUnit=timeUnit))//最大空闲连接数5 最长空闲时间5分钟constructor():this(5,5,TimeUnit.MINUTES)/** Returns the number of idle connections in the pool. */funidleConnectionCount():Int=delegate.idleConnectionCount()/** Returns total number of connections in the pool. */funconnectionCount():Int=delegate.connectionCount()/** Close and remove all idle connections in the pool. */funevictAll(){delegate.evictAll()}}
/** This is the last interceptor in the chain. It makes a network call to the server. */publicfinalclassCallServerInterceptorimplementsInterceptor{privatefinalbooleanforWebSocket;publicCallServerInterceptor(booleanforWebSocket){this.forWebSocket=forWebSocket;}@OverridepublicResponseintercept(Chainchain)throwsIOException{RealInterceptorChainrealChain=(RealInterceptorChain)chain;Exchangeexchange=realChain.exchange();Requestrequest=realChain.request();longsentRequestMillis=System.currentTimeMillis();exchange.writeRequestHeaders(request);//写入请求头booleanresponseHeadersStarted=false;Response.BuilderresponseBuilder=null;if(HttpMethod.permitsRequestBody(request.method())&&request.body()!=null){// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100// Continue" response before transmitting the request body. If we don't get that, return// what we did get (such as a 4xx response) without ever transmitting the request body.if("100-continue".equalsIgnoreCase(request.header("Expect"))){exchange.flushRequest();responseHeadersStarted=true;exchange.responseHeadersStart();responseBuilder=exchange.readResponseHeaders(true);}//写入请求体if(responseBuilder==null){if(request.body().isDuplex()){// Prepare a duplex body so that the application can send a request body later.exchange.flushRequest();BufferedSinkbufferedRequestBody=Okio.buffer(exchange.createRequestBody(request,true));request.body().writeTo(bufferedRequestBody);}else{// Write the request body if the "Expect: 100-continue" expectation was met.BufferedSinkbufferedRequestBody=Okio.buffer(exchange.createRequestBody(request,false));request.body().writeTo(bufferedRequestBody);bufferedRequestBody.close();}}else{exchange.noRequestBody();if(!exchange.connection().isMultiplexed()){// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a consistent state.exchange.noNewExchangesOnConnection();}}}else{exchange.noRequestBody();}if(request.body()==null||!request.body().isDuplex()){exchange.finishRequest();}if(!responseHeadersStarted){exchange.responseHeadersStart();}if(responseBuilder==null){responseBuilder=exchange.readResponseHeaders(false);}Responseresponse=responseBuilder.request(request).handshake(exchange.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();intcode=response.code();if(code==100){// server sent a 100-continue even though we did not request one.// try again to read the actual responseresponse=exchange.readResponseHeaders(false).request(request).handshake(exchange.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();code=response.code();}exchange.responseHeadersEnd(response);if(forWebSocket&&code==101){// Connection is upgrading, but we need to ensure interceptors see a non-null response body.response=response.newBuilder().body(Util.EMPTY_RESPONSE).build();}else{response=response.newBuilder().body(exchange.openResponseBody(response)).build();}if("close".equalsIgnoreCase(response.request().header("Connection"))||"close".equalsIgnoreCase(response.header("Connection"))){exchange.noNewExchangesOnConnection();}if((code==204||code==205)&&response.body().contentLength()>0){thrownewProtocolException("HTTP "+code+" had non-zero Content-Length: "+response.body().contentLength());}returnresponse;}}