Analysis of okhttp source code (very thin and very long)

Set a small goal first 2022-01-26 17:56:03 阅读数:709

analysis okhttp source code long

Preface

This article is about OkHttp A detailed analysis of the open source library , If you think you don't know enough OkHttp, Want to learn more , I believe this article will help you .

This article contains a detailed request flow analysis 、 Interpretation of major interceptors and their own reflection summary , The article is very long. , Welcome to discuss .

Usage method

It's easy to use , Create a... Respectively OkHttpClient object , One Request object , Then use them to create a Call object , Finally, call synchronous request. execute() Method or asynchronous request enqueue() The way to get Response.

private final OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://github.com/")
.build();
// Synchronization request
Response response = client.newCall(request).execute();
//todo handle response
// Asynchronous requests
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
//todo handle request failed
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
//todo handle Response
}
});

Introduction to basic objects

As described in the usage , We have built OkHttpClient object 、Request object 、Call object , What do these objects mean , What's the point ? This requires us to further study and understand .

OkHttpClient

A requested configuration class , Adopted Builder pattern , It is convenient for users to configure some request parameters , Such as configuration callTimeout,cookie,interceptor wait .

open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
// Scheduler
internal var dispatcher: Dispatcher = Dispatcher()
// Connection pool
internal var connectionPool: ConnectionPool = ConnectionPool()
// Overall process interceptor
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// Network process interceptor
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// Process listener
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
// Whether to reconnect when the connection fails
internal var retryOnConnectionFailure = true
// Server authentication settings
internal var authenticator: Authenticator = Authenticator.NONE
// Redirect or not
internal var followRedirects = true
// Whether from HTTP Redirect to HTTPS
internal var followSslRedirects = true
//cookie Set up
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
// Cache Settings
internal var cache: Cache? = null
//DNS Set up
internal var dns: Dns = Dns.SYSTEM
// Agent settings
internal var proxy: Proxy? = null
// Proxy selector settings
internal var proxySelector: ProxySelector? = null
// Proxy authentication settings
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket To configure
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket To configure
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
// agreement
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
// Domain name verification
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
// request timeout
internal var callTimeout = 0
// Connection timeout
internal var connectTimeout = 10_000
// Read timeout
internal var readTimeout = 10_000
// Write timeout
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
··· Omit code ···

Request

It is also the configuration class of request parameters , It also uses Builder pattern , But compared to the OkHttpClient,Request It's very simple , There are only four parameters , Namely request URL Request method Request header Request body .

class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
// Requested URL
internal var url: HttpUrl? = null
// Request method , Such as :GET、POST..
internal var method: String
// Request header
internal var headers: Headers.Builder
// Request body
internal var body: RequestBody? = null
··· Omit code ···

Call

Request to call the interface , Indicates that the request is ready It can be executed , also May cancel , It can only be executed once .

interface Call : Cloneable {
/** Returns the original request that initiated this call */
fun request(): Request
/**
* Synchronization request , Execute now .
*
* Throw two kinds of exceptions :
* 1. If the request fails, throw IOException;
* 2. If it is executed once, it will be thrown again IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response
/**
* Asynchronous requests , Schedule the request to be executed at some point in the future .
* If it is executed once, it will be thrown again IllegalStateException */
fun enqueue(responseCallback: Callback)
/** Cancel the request . Completed requests cannot be cancelled */
fun cancel()
/** Whether it has been executed */
fun isExecuted(): Boolean
/** Is it cancelled */
fun isCanceled(): Boolean
/** A complete Call Timeout configuration of the request process , The default is selected from [OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** Clone this call, Create a new same Call */
public override fun clone(): Call
/** Use the factory model to make OkHttpClient To create Call object */
fun interface Factory {
fun newCall(request: Request): Call
}
}

RealCall

stay OkHttpClient in , We make use of newCall Method to create a Call object , But we can see from the source code ,newCall Method returns a RealCall object .

OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCall yes Call Interface Implementation class of , It is the connection bridge between the application end and the network layer , Show the original request and connection data on the application side , And what the network layer returns response And other data streams . By using the method, we can also know , establish RealCall After the object , Call synchronous or asynchronous request methods , So it also contains Synchronization request execute() And Asynchronous requests enqueue() Method .( The analysis will be carried out later )

AsyncCall

Asynchronous request calls , yes RealCall An inner class , It's just one. Runnable, Executed by the thread pool in the scheduler .

inner class AsyncCall(
// The response callback method passed in by the user
private val responseCallback: Callback
) : Runnable {
// Number of requests for the same domain name ,volatile + AtomicInteger Ensure timely visibility and atomicity under multithreading
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
··· Omit code ···
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// Call the thread pool to execute
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
// request was aborted , call Callback.onFailure() Method
responseCallback.onFailure([email protected], ioException)
} finally {
if (!success) {
// request was aborted , Call the scheduler finish Method
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// The request is successful , Get the information returned by the server response
val response = getResponseWithInterceptorChain()
signalledCallback = true
// call Callback.onResponse() Method , take response Pass out
responseCallback.onResponse([email protected], response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
// request was aborted , call Callback.onFailure() Method
responseCallback.onFailure([email protected], e)
}
} catch (t: Throwable) {
// Exception in request , call cancel Method to cancel the request
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// request was aborted , call Callback.onFailure() Method
responseCallback.onFailure([email protected], canceledException)
}
throw t
} finally {
// End of request , Call the scheduler finish Method
client.dispatcher.finished(this)
}
}
}
}

Dispatcher

Scheduler , To schedule Call object , It contains both thread pool and asynchronous request queue , Used to store and execute AsyncCall object .

class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
// Create a cache thread pool , To handle request calls
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** Ready asynchronous request queue */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Asynchronous request queue running , Include cancel but not yet finish Of AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronization request queue , Include cancel but not yet finish Of RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
··· Omit code ···
}

To sum up

object effect
Call Request to call the interface , Indicates that the request is ready to execute , Can also be cancelled , It can only be executed once .
RealCall Call The concrete implementation class of the interface , It is the connection bridge between application and network layer , contain OkHttpClient And Request Information .
AsyncCall Asynchronous request calls , Actually, it 's just Runnable, Will be put into the thread pool for processing .
Dispatcher Scheduler , To schedule Call object , It contains both thread pool and asynchronous request queue , Used to store and execute AsyncCall object .
Request Request class , contain urlmethodheadersbody.
Response The response data returned by the network layer .
Callback Response callback function interface , contain onFailureonResponse Two methods .

Process analysis

After introducing the object , Next, according to the method of use , Take a look at the source code .

Synchronization request

How to use synchronization request .

client.newCall(request).execute();

newCall The way to do this is to create a RealCall object , Then execute it execute() Method .

 RealCall.kt
override fun execute(): Response {
//CAS Determine whether it has been executed , Ensure that it can only be performed once , If it has been executed , Throw an exception
check(executed.compareAndSet(false, true)) { "Already Executed" }
// Request timeout start timing
timeout.enter()
// Turn on request listening
callStart()
try {
// Call... In the scheduler executed() Method , The scheduler just call Added to the runningSyncCalls In line
client.dispatcher.executed(this)
// call getResponseWithInterceptorChain How to get response
return getResponseWithInterceptorChain()
} finally {
// completion of enforcement , The scheduler will the call from runningSyncCalls Remove... From the queue
client.dispatcher.finished(this)
}
}

Call the scheduler executed Method , It's about putting the current RealCall Objects are added to runningSyncCalls In line , And then call getResponseWithInterceptorChain How to get response.

Asynchronous requests

Let's look at asynchronous requests .

 RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS Determine whether it has been executed , Ensure that it can only be performed once , If it has been executed , Throw an exception
check(executed.compareAndSet(false, true)) { "Already Executed" }
// Turn on request listening
callStart()
// Create a new one AsyncCall object , Through the scheduler enqueue Method added to readyAsyncCalls In line
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

And then call the scheduler. enqueue Method ,

 Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
// Lock , Ensure thread safety
synchronized(this) {
// Add the request call to readyAsyncCalls In line
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
// Use the domain name to find out if there are requests for the same domain name , There is a reuse .
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// Perform the requested
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
// Determine whether a request is being executed
val isRunning: Boolean
// Lock , Ensure thread safety
synchronized(this) {
// Traverse readyAsyncCalls queue
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls The number of concurrent requests cannot be greater than the maximum number of concurrent requests 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// Maximum requests for the same domain name 5, The same domain name allows at most 5 All threads execute requests at the same time
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// from readyAsyncCalls Remove... From the queue , To join the executableCalls And runningAsyncCalls In line
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
// Determine whether a request is executing by the number of requests in the run queue
isRunning = runningCallsCount() > 0
}
// Traverse the executable queue , Call the thread pool to execute AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}

Scheduler's enqueue The way is to AsyncCall Add to readyAsyncCalls In line , And then call promoteAndExecute Method to execute the request ,promoteAndExecute Method is actually traversing readyAsyncCalls queue , Then the qualified requests are executed with the thread pool , That is to say, it will execute AsyncCall.run() Method .

AsyncCall See the specific code of the method Introduction to basic objects AsyncCall, I won't show it here , Simply call getResponseWithInterceptorChain How to get response, And then through Callback.onResponse Method passed out . conversely , If the request fails , Exception caught , Just through Callback.onFailure Pass exception information out . Final , End of request , Call the scheduler finish Method .

 Dispatcher.kt
/** Asynchronous request call end method */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** Synchronous request call end method */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// Call the current request from Running queue Remove
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// Continue with remaining requests , take call from readyAsyncCalls Take out and add to runningAsyncCalls, And then execute
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
// If all requests are executed , In idle state , Call idle callback method
idleCallback.run()
}
}

obtain Response

Then there's a look getResponseWithInterceptorChain The way is how to get response Of .

 internal fun getResponseWithInterceptorChain(): Response {
// List of interceptors
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// Build interceptor responsibility chain
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
// If call Request completed , That means the interaction is complete , There's nothing more to exchange
var calledNoMoreExchanges = false
try {
// Execute the interceptor responsibility chain to obtain response
val response = chain.proceed(originalRequest)
// If it's cancelled , Close response , Throw an exception
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

In a nutshell : Here we use Responsibility chain design pattern , Built through interceptors to RealInterceptorChain Responsibility chain , And then execute proceed Method to get response.

that , This involves Interceptor What is it? ? Interceptor chain of responsibility What is it again? ?

Interceptor

Only one interceptor method is declared , Implement in subclass , It also includes a Chain Interface , The core approach is proceed(request) Process request to get response.

fun interface Interceptor {
/** Interception method */
@Throws(IOException::class)
fun intercept(chain: Chain): Response
interface Chain {
/** Original request data */
fun request(): Request
/** The core approach , Processing requests , obtain response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}

RealInterceptorChain

Interceptor chain is the implementation Interceptor.Chain Interface , The point is to copy proceed Method .

class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
··· Omit code ···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
//index+1, Replication creates a new chain of responsibility , That means calling the next handler in the responsibility chain , That's the next interceptor
val next = copy(index = index + 1, request = request)
// Remove the current interceptor
val interceptor = interceptors[index]
// Execute the interception method of the current interceptor
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
}

call chaining , Eventually, each interceptor in the interceptor list will be executed , return Response.

Interceptor

OK, It's time to look at the specific interceptors in the interceptor list .

Let's start with a summary of various interceptors , According to the order :

  1. client.interceptors: This is set up by the developer , It will be done before all interceptors are processed Earliest Interception processing , Can be used to add some public parameters , Such as Customize header Customize log wait .
  2. RetryAndFollowUpInterceptor: This will do some initialization for the connection , And retrying when the request fails , Subsequent requests for redirection work . Just like his name , It's about retrying and some connection tracking .
  3. BridgeInterceptor: It is the communication bridge between the client and the server , Responsible for converting user built requests into requests required by the server , And converting the response returned by the network request into a response available to the user .
  4. CacheInterceptor: This is mainly about cache related processing , Will be based on the user in OkHttpClient Cache configuration defined in , Then create a new cache policy in combination with the request , It determines whether to use the network or cache to build response.
  5. ConnectInterceptor: This is mainly responsible for establishing connections , Will establish TCP Connect perhaps TLS Connect .
  6. client.networkInterceptors: This is also set up by developers themselves , So it's essentially similar to the first interceptor , But because of the location , So the use is different .
  7. CallServerInterceptor: This is the request and response of network data , That's the actual network I/O operation , Send the request header and body to the server , And parsing the returned by the server response.

Next, let's go in order , From the top down , Interpret these interceptors one by one .

client.interceptors

This is a user-defined interceptor , be called Apply interceptors , Are saved in OkHttpClient Of interceptors: List<Interceptor> In the list . He is in the interceptor responsibility chain The first interceptor , That is, it will be the first to execute the interception method , We can use it to add Customize Header Information , Such as :

class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}
// And then in OkHttpClient Add
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())// Add custom Header Interceptor
.build();

RetryAndFollowUpInterceptor

The second interceptor , You can also know from its name , It is responsible for the retry of failed requests and the subsequent requests for redirection , At the same time, it will do some initialization work on the connection .

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
// I'm going to create a new one here ExchangeFinder,ConnectInterceptor Will use
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// Attempt to connect via route failed . The request will not be sent .
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// The attempt to communicate with the server failed . The request may have been sent .
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
// Attempt to associate previous response, Be careful :body Is for null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// Will be based on responseCode To judge , Building a new request And go back and try again or redirect
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// If the request body is one-time , You don't need to try again
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// max retries , Different browsers are different , such as :Chrome by 21,Safari It is 16
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/** Determine whether to reconnect ,false-> Do not attempt reconnection ;true-> Try to reconnect .*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// The client cannot retry
if (!client.retryOnConnectionFailure) return false
// The request body cannot be sent again
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// The exception that occurs is fatal , Can't recover , Such as :ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
// There is no more way to try to reconnect
if (!call.retryAfterFailure()) return false
// For failed recovery , Use the same route selector with the new connection
return true
}
··· Omit code ···

BridgeInterceptor

As can be seen from its name , His positioning is the communication bridge between the client and the server , Responsible for converting user built requests into requests required by the server , such as : add to Content-Type, add to Cookie, add to User-Agent wait . And then send the returned by the server response Do some processing to convert to what the client needs response. such as : Remove... From the response header Content-EncodingContent-Length wait .

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// Get original request data
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
// Rebuild request header , Request body information
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
··· Omit code ···
// add to cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
// add to user-agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
// Rebuild a Request, The next interceptor is then executed to process the request
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
// Create a new responseBuilder, The purpose is to build the original request data into response in
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
// modify response header Information , remove Content-Encoding,Content-Length Information
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
// modify response body Information
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
··· Omit code ···

CacheInterceptor

The user can go through OkHttpClient.cache To configure the cache , The cache interceptor passed CacheStrategy To determine whether to use the network or cache to build response.

class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
// adopt request from OkHttpClient.cache Get cache
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// Create a cache policy , Used to determine how to use the cache
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// If it is empty, it means that the network is not used , conversely , Indicates that the network is used
val networkRequest = strategy.networkRequest
// If it is empty, the cache will not be used , conversely , Indicates that the cache is used
val cacheResponse = strategy.cacheResponse
// Track network and cache usage
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
// Cache available but not applicable , Close it
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
// If the network is disabled , But the cache is empty , Construct a code by 504 Of response, And back to
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// If we disable the network and don't use the network , And cache , Build directly from the cache content and return response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
// Add listener for cache
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// The chain of responsibility goes down , Return from the server response Assign a value to networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
// Capture I/O Or other abnormalities , request was aborted ,networkResponse It's empty , When there is a cache , Do not expose cache content .
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If you have a cache
if (cacheResponse != null) {
// And the network returns response code by 304 When , Use the cached content to build a new Response return .
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
// Otherwise, close the cached response body
cacheResponse.body?.closeQuietly()
}
}
// Build a network request response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
// If cache Not for null, That is, the user OkHttpClient Cache is configured in , The newly built network request in the previous step will be response Deposit in cache in
if (cache != null) {
// according to response Of code,header as well as CacheControl.noStore To determine whether you can cache
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Will be response Deposited in the cache
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
// Determine whether the cache is valid according to the request method , Only right Get Request to cache , Requests for other methods are removed
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
// Invalid cache , Cache the request from client Remove from cache configuration
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
··· Omit code ···

ConnectInterceptor

Responsible for establishing a real connection with the server ,

object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// Initialize a exchange object
val exchange = realChain.call.initExchange(chain)
// According to this exchange Object to create a new connection responsibility chain
val connectedChain = realChain.copy(exchange = exchange)
// Execute this link chain of responsibility
return connectedChain.proceed(realChain.request)
}
}

Sweep it down , The code is very simple , There are only three steps in the interception method .

  1. Initialize a exchange object .
  2. And then according to this exchange Object to create a new connection responsibility chain .
  3. Execute this link chain of responsibility .

So this one exchange What is the object ?

RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
... Omit code ...
// there exchangeFinder Is in the RetryAndFollowUpInterceptor Created in the
val exchangeFinder = this.exchangeFinder!!
// Return to one ExchangeCodec( It's an encoder , by request Code as well as response decode )
val codec = exchangeFinder.find(client, chain)
// according to exchangeFinder And codec Build a new one Exchange object , And back to
val result = Exchange(this, eventListener, exchangeFinder, codec)
... Omit code ...
return result
}

Let's see specifically ExchangeFinder.find() This step ,

ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
// Find qualified and available connections , Return to one RealConnection object
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
// According to the connection , Create and return a request response encoder :Http1ExchangeCodec perhaps Http2ExchangeCodec, They correspond to each other Http1 Deal with the Http2 agreement
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}

Keep looking down findHealthyConnection Method

ExchangeFinder.kt
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
// a key : Find connections
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Check whether the connection is qualified and available , If it is qualified, it will directly return to the connection
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If the connection fails , Mark as not available , Remove from connection pool
candidate.noNewExchanges()
... Omit code ...
}
}

In a nutshell : adopt findConnection Method to find the connection , After finding the connection, judge whether it is qualified and available , If it is qualified, it will directly return to the connection .

So the core method is findConnection, Let's take a closer look at this method :

private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// for the first time , Try to reconnect call Medium connection, You don't need to get the connection again
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If call Medium connection Not released yet , Just reuse it .
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// If call Medium connection Has been released , close Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// A new connection is required , So reset some states
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// The second time , Try to get a connection from the connection pool , Without routing , Without multiplexing
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// The connection pool is empty , Prepare the route for the next connection attempt
val routes: List<Route>?
val route: Route
... Omit code ...
// third time , Try again to get a connection from the connection pool , With routing , Without multiplexing
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// The fourth time , Manually create a new connection
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// The fifth time , Try again to get a connection from the connection pool , With routing , With multiplexing .
// This step is mainly to verify , For example, there is already a connection , It can be reused directly , Instead of using new connections created manually .
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
// Put the manually created new connection into the connection pool
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}

You can see in the code that , A total of did 5 Try again to get connected :

  1. for the first time , Try to reconnect call Medium connection, You don't need to get the connection again .
  2. The second time , Try to get a connection from the connection pool , Without routing , Without multiplexing .
  3. third time , Try again to get a connection from the connection pool , With routing , Without multiplexing .
  4. The fourth time , Manually create a new connection .
  5. The fifth time , Try again to get a connection from the connection pool , With routing , With multiplexing .

OK, Here we are , Even if a connection is established .

client.networkInterceptors

The interceptor is called Network interceptor , And client.interceptors The same is also defined by the user , It also exists in the form of a list OkHttpClient in .

What's the difference between the two interceptors ?

In fact, the difference between them is due to their different positions , The application interceptor is in the first position , So anyway, it Will be executed , And it's only going to be executed once . The network interceptor is in the penultimate position , it It doesn't have to be carried out , And may be executed many times , such as : stay RetryAndFollowUpInterceptor Failure or CacheInterceptor In the case of directly returning to the cache , Our network interceptor will not be executed .

CallServerInterceptor

Here we are. , The client and server have established a connection , Then send the request header and request body to the server , And parsing the returned by the server response 了 .

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
// Write request header
exchange.writeRequestHeaders(request)
// If not GET request , And the request body is not empty
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// When the request header is "Expect: 100-continue" when , Before sending the request body, you need to wait for the server to return "HTTP/1.1 100 Continue" Of response, If you don't wait until the response, The request body is not sent .
//POST request , Send the request header first , In the access to 100 Continue to send the request body after continuing the status
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
// Refresh request , Send request header
exchange.flushRequest()
// Parse response headers
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
// Write request body
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// If the requestor is a dual public , Just send the request header first , Send the request body later
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
// Write request body
requestBody.writeTo(bufferedRequestBody)
} else {
// If you get "Expect: 100-continue" Respond to , Write request body
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
··· Omit code ···
// End of request , Send request body
exchange.finishRequest()
··· Omit code ···
try {
if (responseBuilder == null) {
// Read response header
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
··· Omit code ···
// Construct a response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
··· Omit code ···
return response
··· Omit code ···

In a nutshell : Write send request header , Then write the transmission request body according to the condition , End of request . Parse the request header returned by the server , Then build a new response, And back to . here CallServerInterceptor It's the last interceptor in the interceptor responsibility chain , So he won't call chain.proceed() Method to execute , But build this response Pass it up to each interceptor in the chain of responsibility .

 Interceptor flow chart

summary

We analyzed the request flow , Including synchronous request and asynchronous request , Each interceptor in the interceptor responsibility chain is also carefully analyzed , Now draw a flowchart , Just to summarize , You can refer to the flow chart , Going through the process .

 Complete flow chart

reflection

Design patterns

  1. Builder pattern : Whether in OkHttpClientRequest still Response The builder model is used in , Because there are many parameters in these classes , Users need to choose the parameters they need to build the instance they want , So in the open source library ,Build Pattern It's very common .
  2. Factory method model : Help generate complex objects , Such as : OkHttpClient.newCall(request Request) To create Call object .
  3. The chain of responsibility model : This one works very well , take 7 Interceptors constitute the interceptor responsibility chain , Then execute from top to bottom in order , obtain Response after , Upload back from bottom to top .

Thread safety

stay AsyncCall Class callsPerHost Variable , Used Volatile + AtomicInteger To modify , So as to ensure thread safety under multithreading .

inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
// Number of requests for the same domain name ,volatile + AtomicInteger Ensure timely visibility and atomicity under multithreading
@Volatile var callsPerHost = AtomicInteger(0)
private set
... Omit code ...

data structure

Why? readyAsyncCalls runningAsyncCalls runningSyncCalls use ArrayDeque Well ?

Two point answer One 、 They are all used to store network requests , These requests need to be on a first come, first served basis , So queue . Two 、 According to the code , When executed enqueue when , We need to traverse readyAsyncCalls, Will meet the execution conditions Call Add to runningAsyncCalls, This is relative to the linked list , The search efficiency of array is higher , So using ArrayDeque.

ending

Here we are , About OkHttp The source code analysis of .

In fact, the best way to learn the source code , Is to clone the code yourself , Then face the method of use , According to the process , Step by step .

In fact, the biggest purpose of sharing articles is to wait for someone to point out my mistakes , If you find something wrong , Please point out without reservation , She learned . in addition , If you think the article is good , It helps you , Please give me a compliment , Just encourage , thank you ~Peace~!
video :
Senior architect explains in detail one by one Android Selected high-frequency interview questions from large factories OkHttp
Android( Android ) Develop zero foundation from entry to mastery OkHttp
original text : https://juejin.cn/post/7033307467199021086
copyright:author[Set a small goal first],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201261756000895.html