mirror of
https://github.com/NeoFlock/neonucleus.git
synced 2025-09-24 09:03:32 +02:00
321 lines
8.9 KiB
Lua
321 lines
8.9 KiB
Lua
local pipe = require("pipe")
|
|
local event = require("event")
|
|
local process = require("process")
|
|
local computer = require("computer")
|
|
|
|
local thread = {}
|
|
local init_thread
|
|
|
|
local function waitForDeath(threads, timeout, all)
|
|
checkArg(1, threads, "table")
|
|
checkArg(2, timeout, "number", "nil")
|
|
checkArg(3, all, "boolean")
|
|
timeout = timeout or math.huge
|
|
local mortician = {}
|
|
local timed_out = true
|
|
local deadline = computer.uptime() + timeout
|
|
while deadline > computer.uptime() do
|
|
local dieing = {}
|
|
local living = false
|
|
for _,t in ipairs(threads) do
|
|
local mt = getmetatable(t)
|
|
local result = mt.attached.data.result
|
|
local proc_ok = type(result) ~= "table" or result[1]
|
|
local ready_to_die = t:status() ~= "running" -- suspended is considered dead to exit
|
|
or not proc_ok -- the thread is killed if its attached process has a non zero exit
|
|
if ready_to_die then
|
|
dieing[#dieing + 1] = t
|
|
mortician[t] = true
|
|
else
|
|
living = true
|
|
end
|
|
end
|
|
|
|
if all and not living or not all and #dieing > 0 then
|
|
timed_out = false
|
|
break
|
|
end
|
|
|
|
-- resume each non dead thread
|
|
-- we KNOW all threads are event.pull blocked
|
|
event.pull(deadline - computer.uptime())
|
|
end
|
|
|
|
for t in pairs(mortician) do
|
|
t:kill()
|
|
end
|
|
|
|
if timed_out then
|
|
return nil, "thread join timed out"
|
|
end
|
|
return true
|
|
end
|
|
|
|
function thread.waitForAny(threads, timeout)
|
|
return waitForDeath(threads, timeout, false)
|
|
end
|
|
|
|
function thread.waitForAll(threads, timeout)
|
|
return waitForDeath(threads, timeout, true)
|
|
end
|
|
|
|
local box_thread = {}
|
|
|
|
function box_thread:resume()
|
|
local mt = getmetatable(self)
|
|
if mt.__status ~= "suspended" then
|
|
return nil, "cannot resume " .. mt.__status .. " thread"
|
|
end
|
|
mt.__status = "running"
|
|
-- register the thread to wake up
|
|
if coroutine.status(self.pco.root) == "suspended" and not mt.reg then
|
|
mt.register(0)
|
|
end
|
|
return true
|
|
end
|
|
|
|
function box_thread:suspend()
|
|
local mt = getmetatable(self)
|
|
if mt.__status ~= "running" then
|
|
return nil, "cannot suspend " .. mt.__status .. " thread"
|
|
end
|
|
mt.__status = "suspended"
|
|
local pco_status = coroutine.status(self.pco.root)
|
|
if pco_status == "running" or pco_status == "normal" then
|
|
mt.coma()
|
|
end
|
|
return true
|
|
end
|
|
|
|
function box_thread:status()
|
|
return getmetatable(self).__status
|
|
end
|
|
|
|
function box_thread:join(timeout)
|
|
return waitForDeath({self}, timeout, true)
|
|
end
|
|
|
|
function box_thread:kill()
|
|
getmetatable(self).close()
|
|
end
|
|
|
|
function box_thread:detach()
|
|
return self:attach(init_thread)
|
|
end
|
|
|
|
function box_thread:attach(parent)
|
|
local proc = process.info(parent)
|
|
local mt = assert(getmetatable(self), "thread panic: no metadata")
|
|
if not proc then return nil, "thread failed to attach, process not found" end
|
|
if mt.attached == proc then return self end -- already attached
|
|
|
|
-- remove from old parent
|
|
local waiting_handler
|
|
if mt.attached then
|
|
-- registration happens on the attached proc, unregister before reparenting
|
|
waiting_handler = mt.unregister()
|
|
process.removeHandle(self, mt.attached)
|
|
end
|
|
|
|
-- fix close
|
|
self.close = self.join
|
|
|
|
-- attach to parent or the current process
|
|
mt.attached = proc
|
|
process.addHandle(self, proc)
|
|
|
|
-- register on the new parent
|
|
if waiting_handler then -- event-waiting
|
|
mt.register(waiting_handler.timeout - computer.uptime())
|
|
end
|
|
|
|
return self
|
|
end
|
|
|
|
function thread.current()
|
|
local proc = process.findProcess()
|
|
local thread_root
|
|
while proc do
|
|
if thread_root then
|
|
for _,handle in ipairs(proc.data.handles) do
|
|
if handle.pco and handle.pco.root == thread_root then
|
|
return handle
|
|
end
|
|
end
|
|
else
|
|
thread_root = proc.data.coroutine_handler.root
|
|
end
|
|
proc = proc.parent
|
|
end
|
|
end
|
|
|
|
function thread.create(fp, ...)
|
|
checkArg(1, fp, "function")
|
|
|
|
local mt = {__status="suspended",__index=box_thread}
|
|
local t = setmetatable({}, mt)
|
|
t.pco = pipe.createCoroutineStack(function(...)
|
|
mt.__status = "running"
|
|
local fp_co = t.pco.create(fp)
|
|
-- run fp_co until dead
|
|
-- pullSignal will yield_past this point
|
|
-- but yield will return here, we pullSignal from here to yield_past
|
|
local args = table.pack(...)
|
|
while true do
|
|
local result = table.pack(t.pco.resume(fp_co, table.unpack(args, 1, args.n)))
|
|
if t.pco.status(fp_co) == "dead" then
|
|
-- this error handling is VERY much like process.lua
|
|
-- maybe one day it'll merge
|
|
if not result[1] then
|
|
local exit_code
|
|
local msg = result[2]
|
|
-- msg can be a custom error object
|
|
local reason = "crashed"
|
|
if type(msg) == "table" then
|
|
if type(msg.reason) == "string" then
|
|
reason = msg.reason
|
|
end
|
|
exit_code = tonumber(msg.code)
|
|
elseif type(msg) == "string" then
|
|
reason = msg
|
|
end
|
|
if not exit_code then
|
|
pcall(event.onError, string.format("[thread] %s", reason))
|
|
exit_code = 1
|
|
end
|
|
os.exit(exit_code)
|
|
end
|
|
break
|
|
end
|
|
args = table.pack(event.pull(table.unpack(result, 2, result.n)))
|
|
end
|
|
end, nil, "thread")
|
|
|
|
--special resume to keep track of process death
|
|
function mt.private_resume(...)
|
|
mt.unregister()
|
|
-- this thread may have been killed
|
|
if t:status() == "dead" then return end
|
|
local result = table.pack(t.pco.resume(t.pco.root, ...))
|
|
if t.pco.status(t.pco.root) == "dead" then
|
|
mt.close()
|
|
end
|
|
return table.unpack(result, 1, result.n)
|
|
end
|
|
|
|
mt.process = process.list[t.pco.root]
|
|
mt.process.data.handlers = {}
|
|
|
|
function mt.register(timeout)
|
|
-- register a timeout handler
|
|
mt.id = event.register(
|
|
nil, -- nil key matches anything, timers use false keys
|
|
mt.private_resume,
|
|
timeout, -- wait for the time specified by the caller
|
|
1, -- we only want this thread to wake up once
|
|
mt.attached.data.handlers) -- optional arg, to specify our own handlers
|
|
mt.reg = mt.attached.data.handlers[mt.id]
|
|
end
|
|
|
|
function mt.unregister()
|
|
local id = mt.id
|
|
local reg = mt.reg
|
|
mt.id = nil
|
|
mt.reg = nil
|
|
-- before just removing a handler, make sure it is still ours
|
|
if id and mt.attached.data.handlers[id] == reg then
|
|
mt.attached.data.handlers[id] = nil
|
|
return reg
|
|
end
|
|
end
|
|
|
|
function mt.coma()
|
|
mt.unregister() -- we should not wake up again (until resumed)
|
|
while mt.__status == "suspended" do
|
|
t.pco.yield_past(t.pco.root, 0)
|
|
end
|
|
end
|
|
|
|
function mt.process.data.pull(_, timeout)
|
|
--[==[
|
|
yield_past(root) will yield until out of this thread
|
|
registration puts in a callback to resume this thread
|
|
|
|
Subsequent registrations are necessary in case the thread is suspended
|
|
This thread yields when suspended, entering a coma state
|
|
-> coma state: yield without registration
|
|
|
|
resume will regsiter a wakeup call, breaks coma
|
|
|
|
subsequent yields need not specify a timeout because
|
|
we already legitimately resumed only to find out we had been suspended
|
|
|
|
3 places register for wake up
|
|
1. computer.pullSignal [this path]
|
|
2. t:attach(proc) will unregister and re-register
|
|
3. t:resume() of a suspended thread
|
|
]==]
|
|
mt.register(timeout)
|
|
local event_data = table.pack(t.pco.yield_past(t.pco.root, timeout))
|
|
mt.coma()
|
|
return table.unpack(event_data, 1, event_data.n)
|
|
end
|
|
|
|
function mt.close()
|
|
local old_status = t:status()
|
|
mt.__status = "dead"
|
|
process.removeHandle(t, mt.attached)
|
|
if old_status ~= "dead" then
|
|
event.push("thread_exit")
|
|
end
|
|
end
|
|
|
|
t:attach() -- the current process
|
|
mt.private_resume(...) -- threads start out running
|
|
|
|
return t
|
|
end
|
|
|
|
do
|
|
local handlers = event.handlers
|
|
local handlers_mt = getmetatable(handlers)
|
|
-- the event library sets a metatable on handlers, but we set threaded=true
|
|
if not handlers_mt.threaded then
|
|
-- find the root process
|
|
local root_data
|
|
for t,p in pairs(process.list) do
|
|
if not p.parent then
|
|
init_thread = t
|
|
root_data = p.data
|
|
break
|
|
end
|
|
end
|
|
assert(init_thread, "thread library panic: no init thread")
|
|
handlers_mt.threaded = true
|
|
-- if we don't separate root handlers from thread handlers we see double dispatch
|
|
-- because the thread calls dispatch on pull as well
|
|
root_data.handlers = {} -- root handlers
|
|
root_data.pull = handlers_mt.__call -- the real computer.pullSignal
|
|
while true do
|
|
local key, value = next(handlers)
|
|
if not key then break end
|
|
root_data.handlers[key] = value
|
|
handlers[key] = nil
|
|
end
|
|
handlers_mt.__index = function(_, key)
|
|
return process.info().data.handlers[key]
|
|
end
|
|
handlers_mt.__newindex = function(_, key, value)
|
|
process.info().data.handlers[key] = value
|
|
end
|
|
handlers_mt.__pairs = function(_, ...)
|
|
return pairs(process.info().data.handlers, ...)
|
|
end
|
|
handlers_mt.__call = function(tbl, ...)
|
|
return process.info().data.pull(tbl, ...)
|
|
end
|
|
end
|
|
end
|
|
|
|
return thread
|