mirror of
https://github.com/NeoFlock/neonucleus.git
synced 2025-09-25 01:23:31 +02:00
231 lines
7.2 KiB
Lua
231 lines
7.2 KiB
Lua
local process = require("process")
|
|
local shell = require("shell")
|
|
local buffer = require("buffer")
|
|
local command_result_as_code = require("sh").internal.command_result_as_code
|
|
|
|
local pipe = {}
|
|
local _root_co = assert(process.info(), "process metadata failed to load").data.coroutine_handler
|
|
|
|
-- root can be a coroutine or a function
|
|
function pipe.createCoroutineStack(root, env, name)
|
|
checkArg(1, root, "thread", "function")
|
|
|
|
if type(root) == "function" then
|
|
root = assert(process.load(root, env, nil, name or "pipe"), "failed to load proc data for given function")
|
|
end
|
|
|
|
local proc = assert(process.list[root], "coroutine must be a process thread else the parent process is corrupted")
|
|
|
|
local pco = setmetatable({root=root}, {__index=_root_co})
|
|
proc.data.coroutine_handler = pco
|
|
|
|
function pco.yield(...)
|
|
return _root_co.yield(nil, ...)
|
|
end
|
|
function pco.yield_past(co, ...)
|
|
return _root_co.yield(co, ...)
|
|
end
|
|
function pco.resume(co, ...)
|
|
checkArg(1, co, "thread")
|
|
local args = table.pack(...)
|
|
while true do -- for consecutive sysyields
|
|
local result = table.pack(_root_co.resume(co, table.unpack(args, 1, args.n)))
|
|
local target = result[2] == true and pco.root or result[2]
|
|
if not result[1] or _root_co.status(co) == "dead" then
|
|
return table.unpack(result, 1, result.n)
|
|
elseif target and target ~= co then
|
|
args = table.pack(_root_co.yield(table.unpack(result, 2, result.n)))
|
|
else
|
|
return true, table.unpack(result, 3, result.n)
|
|
end
|
|
end
|
|
end
|
|
return pco
|
|
end
|
|
|
|
local pipe_stream =
|
|
{
|
|
continue = function(self, exit)
|
|
local result = table.pack(coroutine.resume(self.next))
|
|
while true do -- repeat resumes if B (A|B) makes a natural yield
|
|
-- if B crashed or closed in the last resume
|
|
-- then we can close the stream
|
|
if coroutine.status(self.next) == "dead" then
|
|
self:close()
|
|
-- always cause os.exit when the pipe closes
|
|
-- this is very important
|
|
-- e.g. cat very_large_file | head
|
|
-- when head is done, cat should stop
|
|
result[1] = nil
|
|
end
|
|
-- the pipe closed or crashed
|
|
if not result[1] then
|
|
if exit then
|
|
os.exit(command_result_as_code(result[2]))
|
|
end
|
|
return self
|
|
end
|
|
-- next is suspended, read_mode indicates why
|
|
if self.read_mode then
|
|
-- B wants A to write again, resume A
|
|
return self
|
|
end
|
|
-- not reading, it is requesting a yield
|
|
-- yield_past(true) will exit this coroutine stack
|
|
result = table.pack(coroutine.yield_past(true, table.unpack(result, 2, result.n)))
|
|
result = table.pack(coroutine.resume(self.next, table.unpack(result, 1, result.n))) -- the request was for an event
|
|
end
|
|
end,
|
|
close = function(self)
|
|
self.closed = true
|
|
if coroutine.status(self.next) == "suspended" then
|
|
self:continue()
|
|
end
|
|
end,
|
|
seek = function()
|
|
return nil, "bad file descriptor"
|
|
end,
|
|
write = function(self, value)
|
|
if self.closed then
|
|
-- if next is dead, ignore all writes
|
|
if coroutine.status(self.next) ~= "dead" then
|
|
io.stderr:write("attempt to use a closed stream\n")
|
|
os.exit(1)
|
|
end
|
|
else
|
|
self.buffer = self.buffer .. value
|
|
return self:continue(true)
|
|
end
|
|
os.exit(0) -- abort the current process: SIGPIPE
|
|
end,
|
|
read = function(self, n)
|
|
if self.closed then
|
|
return nil -- eof
|
|
end
|
|
if self.buffer == "" then
|
|
-- the pipe_stream write resume is waiting on this process B (A|B) to yield
|
|
-- yield here requests A to output again. However, B may elsewhere want a
|
|
-- natural yield (i.e. for events). To differentiate this yield from natural
|
|
-- yields we set read_mode here, which the pipe_stream write detects
|
|
self.read_mode = true
|
|
coroutine.yield_past(self.next) -- next is the first croutine in this stack
|
|
self.read_mode = false
|
|
end
|
|
local result = string.sub(self.buffer, 1, n)
|
|
self.buffer = string.sub(self.buffer, n + 1)
|
|
return result
|
|
end
|
|
}
|
|
|
|
-- prog1 | prog2 | ... | progn
|
|
function pipe.buildPipeChain(progs)
|
|
local chain = {}
|
|
local prev_piped_stream
|
|
for i=1,#progs do
|
|
local thread = progs[i]
|
|
-- A needs to be a stack in case any thread in A call write and then B natural yields
|
|
-- B needs to be a stack in case any thread in B calls read
|
|
pipe.createCoroutineStack(thread)
|
|
chain[i] = thread
|
|
local proc = process.info(thread)
|
|
local pio = proc.data.io
|
|
|
|
local piped_stream
|
|
if i < #progs then
|
|
local handle = setmetatable({buffer = ""}, {__index = pipe_stream})
|
|
process.addHandle(handle, proc)
|
|
piped_stream = buffer.new("rw", handle)
|
|
piped_stream:setvbuf("no", 1024)
|
|
pio[1] = piped_stream
|
|
end
|
|
|
|
if prev_piped_stream then
|
|
prev_piped_stream.stream.next = thread
|
|
pio[0] = prev_piped_stream
|
|
end
|
|
|
|
prev_piped_stream = piped_stream
|
|
end
|
|
|
|
return chain
|
|
end
|
|
|
|
local chain_stream =
|
|
{
|
|
read = function(self, value, ...)
|
|
if self.io_stream.closed then return nil end
|
|
-- wake up prog
|
|
self.ready = false -- the pipe proc sets this true when ios completes
|
|
local ret = table.pack(coroutine.resume(self.pco.root, value, ...))
|
|
if coroutine.status(self.pco.root) == "dead" then
|
|
return nil
|
|
elseif not ret[1] then
|
|
return table.unpack(ret, 1, ret.n)
|
|
end
|
|
if not self.ready then
|
|
-- prog yielded back without writing/reading
|
|
return self:read(coroutine.yield())
|
|
end
|
|
return ret[2]
|
|
end,
|
|
write = function(self, ...)
|
|
return self:read(...)
|
|
end,
|
|
close = function(self)
|
|
self.io_stream:close()
|
|
end,
|
|
}
|
|
|
|
function pipe.popen(prog, mode, env)
|
|
mode = mode or "r"
|
|
if mode ~= "r" and mode ~= "w" then
|
|
return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w"
|
|
end
|
|
|
|
local r = mode == "r"
|
|
|
|
local chain = {}
|
|
-- to simplify the code - shell.execute is run within a function to pass (prog, env)
|
|
-- if cmd_proc were to come second (mode=="w") then the pipe_proc would have to pass
|
|
-- the starting args. which is possible, just more complicated
|
|
local cmd_proc = process.load(function() return shell.execute(prog, env) end, nil, nil, prog)
|
|
|
|
-- the chain stream is the popen controller
|
|
local stream = setmetatable({}, { __index = chain_stream })
|
|
|
|
-- the stream needs its own process for io
|
|
local pipe_proc = process.load(function()
|
|
local n = r and 0 or ""
|
|
local key = r and "read" or "write"
|
|
local ios = stream.io_stream
|
|
while not ios.closed do
|
|
-- read from pipe
|
|
local ret = table.pack(ios[key](ios, n))
|
|
stream.ready = true
|
|
-- yield outside the chain now
|
|
n = coroutine.yield_past(chain[1], table.unpack(ret, 1, ret.n))
|
|
end
|
|
end, nil, nil, "pipe_handler")
|
|
|
|
chain[r and 1 or 2] = cmd_proc
|
|
chain[r and 2 or 1] = pipe_proc
|
|
|
|
-- link the cmd and pipe proc io
|
|
pipe.buildPipeChain(chain)
|
|
local cmd_data = process.info(chain[1]).data
|
|
local cmd_stack = cmd_data.coroutine_handler
|
|
|
|
-- store handle to io_stream from easy access later
|
|
stream.io_stream = cmd_data.io[1].stream
|
|
stream.pco = cmd_stack
|
|
|
|
-- popen commands start out running, like threads
|
|
cmd_stack.resume(cmd_stack.root)
|
|
|
|
local buffered_stream = buffer.new(mode, stream)
|
|
buffered_stream:setvbuf("no", 1024)
|
|
return buffered_stream
|
|
end
|
|
|
|
return pipe
|