关于 Cosocket 的 socket busy 报错

原文转载自 「火丁笔记」 ( https://blog.huoding.com/2020/01/15/795 ) By 老王

预计阅读时间 0 分钟(共 0 个字, 0 张图片, 0 个链接)

关于 OpenResty 的 cosocket,文档里有如下一段描述:

the cosocket object here is full-duplex, that is, a reader “light thread” and a writer “light thread” can operate on a single cosocket object simultaneously (both “light threads” must belong to the same Lua handler though, see reasons above). But you cannot have two “light threads” both reading (or writing or connecting) the same cosocket, otherwise you might get an error like “socket busy reading” when calling the methods of the cosocket object.

简单点儿说,cosocket 是全双工的,如果同一个 lua handler 有一个读线程和一个写线程的话,那么它们可以同时操作一个 cosocket 对象,但是如果两个线程一起读或者写一个 cosocket 对象的话,那么会触发「socket busy」错误。

测试需要,我用「nc -l 1111」命令启动了一个 TCP 服务,监听 1111 端口,如果手头没有 linux 环境,不能使用 nc 命令的话,那么你随便用某个网址的 80 端口也是一样的。

首先让我们编程复现一下「socket busy」错误,代码逻辑很简单,就是让两个线程对同一个 cosocket 一起发出写操作。通过 resty 运行如下代码:

local sock = ngx.socket.tcp()
sock:connect("127.0.0.1", 1111) -- shell: nc -l 1111

local data = {}

for i = 1, 1024 do
    data[i] = "data"
end

data = table.concat(data) .. "\n"

local function test(worker)
    for i = 1, 9999 do
        ngx.log(ngx.ERR, worker, ": ", i)

        local _, err = sock:send(data)
        -- ngx.sleep(0)

        if err then
            ngx.log(ngx.ERR, worker, ": ", i, " err: ", err)
            break
        end
    end
end

local a = ngx.thread.spawn(test, "a")
local b = ngx.thread.spawn(test, "b")

ngx.thread.wait(a, b)

ngx.thread.kill(a)
ngx.thread.kill(b)

结果如下,确实出现了错误「socket busy」:

并发出错

并发出错

我在做实验的时候遇到了两个问题需要说明一下:

  • 问题一:测试数据(本例中 data 为 4k)最好大一点,否则可能无法复现错误。
  • 问题二:从结果看,线程 a 运行了几百次后,线程 b 才开始运行,也就是说线程 a 得到了 CPU 就不愿意撒手,此时可以通过 ngx.sleep(0) 主动交出 CPU 控制权。

接下来看看如何解决「socket busy」错误,既然出现「socket busy」错误的原因是多线程一起读或者写同一个 cosocket 对象,那我们只要加一把锁让操作串行就行了,不过需要注意的是,这里不要通过 lua-resty-lock 来加锁,而应该通过 semaphore 来加锁,这是因为 lua-resty-lock 的控制粒度比较粗,适合请求在多个 worker 时的情况,而 semaphore 的控制粒度比较细,适合请求在单个 worker 时的情况。通过 resty 运行如下代码:

local semaphore = require "ngx.semaphore"
local sema = semaphore.new()

local sock = ngx.socket.tcp()
sock:connect("127.0.0.1", 1111) -- shell: nc -l 1111

local data = {}

for i = 1, 1024 do
    data[i] = "data"
end

data = table.concat(data) .. "\n"

local function test(worker)
    for i = 1, 9999 do
        ngx.log(ngx.ERR, worker, ": ", i)

        local ok, _ = sema:wait(1)

        if not ok then
            break
        end

        local _, err = sock:send(data)
        sema:post()

        if err then
            ngx.log(ngx.ERR, worker, ": ", i, " err: ", err)
            break
        end
    end
end

local a = ngx.thread.spawn(test, "a")
local b = ngx.thread.spawn(test, "b")

sema:post()

ngx.thread.wait(a, b)

ngx.thread.kill(a)
ngx.thread.kill(b)

结果如下,你会发现请求完全执行完了,整个过程中没有出错:

并发未出错

并发未出错

以后使用 OpenResty 的时候,如果多个线程要同时读或者写同一个 cosocket 对象,那么切记要用 semaphore 控制一下,避免出现「socket busy」错误。当然了,最理想的情况是不用引入 semaphore,每个 cosocket 对象都有一个专门的读线程,一个专门的写线程,此时如果读线程需要写操作,可以考虑通过队列把写操作转给写线程去完成,如此一来既避免使用 semaphore,又充分发挥了全双工的效率。

more_vert