浅谈chuck-lua中的多线程

chuck-lua支持actor模式的线程模型.可以通过cthread.new创建线程,然后通过cthread.sendmail向线程发送消息.

与skynet这种框架不同,chuck-lua并不提供多线程的任务/消息调度功能,每个线程维护了一个简单的线程邮箱,用于缓存其它线程发过来的消息.

下面看一个简单的多线程服务器示例:

mtserver.lua

local chuck = require("chuck")
local engine = require("distri.engine")
local socket_helper = chuck.socket_helper
local Distri = require("distri.distri")
local cthread = chuck.cthread

local worker 

function on_new_client(fd)
  cthread.sendmail(worker,{fd})
end

local fd = socket_helper.socket(socket_helper.AF_INET,
                 socket_helper.SOCK_STREAM,
                 socket_helper.IPPROTO_TCP)
socket_helper.addr_reuse(fd,1)

local ip = "127.0.0.1"
local port = 8010

if 0 == socket_helper.listen(fd,ip,port) then
  print("server start",ip,port)
  local server = chuck.acceptor(fd)
  server:Add2Engine(engine,on_new_client)
  Distri.Signal(chuck.signal.SIGINT,Distri.Stop) 
  worker = cthread.new("distri/test/worker.lua")
  Distri.Run()
end

worker.lua

local chuck = require("chuck")
local socket = require("distri.socket")
local engine = require("distri.engine")
local clone  = chuck.packet.clone
local cthread = chuck.cthread
local Distri = require("distri.distri")

--设置邮件处理函数
cthread.process_mail(engine,function (sender,mail)
  local fd = table.unpack(mail)
  local s = socket.stream.New(fd)
  if s:Ok(4096,socket.stream.decoder.rawpacket(),function (_,msg,errno)
    if msg then
      s:Send(clone(msg))
    else
      s:Close(errno)
      s = nil
    end
  end) then
    s:SetRecvTimeout(5000)
  else
    s:Close()
  end 
end)

Distri.Run()

这个示例很简单,主线程启动监听,创建一个线程,当接收到连接时就将fd发送给worker线程.

在这里需要简单介绍一下chuck-lua线程相关的一些细节.

因为各线程跑在独立的虚拟机上,因此无法直接通过消息的方式将一个虚拟机中的对象发送到另一个线程中.目前sendmail将作为消息传递给它的lua table序列化为一种适合传输的对象,目标线程接收这个对象之后再重新转化成本线程虚拟机中的lua table.目前消息支持lua中的所有基本类型,但为了安全考虑,不支持直接传递userdata类型.

用cthread.sendmail向目标线程发送消息时,如果到达目标邮箱的缓冲上线将会阻塞.所有期望处理邮件消息的线程都必须调用cthread.process_mail设定消息回调函数.如果不设定,将可能导致消息发送线程永久阻塞.

线程使用join模式创建,创建者可以通过cthread.join等待线程的结束.

以上所述就是本文的全部内容了,希望大家能够喜欢。