forgot to commit openos
This commit is contained in:
230
data/openos/lib/pipe.lua
Normal file
230
data/openos/lib/pipe.lua
Normal file
@@ -0,0 +1,230 @@
|
||||
local process = require("process")
|
||||
local shell = require("shell")
|
||||
local buffer = require("buffer")
|
||||
local command_result_as_code = require("sh").internal.command_result_as_code
|
||||
|
||||
local pipe = {}
|
||||
local _root_co = assert(process.info(), "process metadata failed to load").data.coroutine_handler
|
||||
|
||||
-- root can be a coroutine or a function
|
||||
function pipe.createCoroutineStack(root, env, name)
|
||||
checkArg(1, root, "thread", "function")
|
||||
|
||||
if type(root) == "function" then
|
||||
root = assert(process.load(root, env, nil, name or "pipe"), "failed to load proc data for given function")
|
||||
end
|
||||
|
||||
local proc = assert(process.list[root], "coroutine must be a process thread else the parent process is corrupted")
|
||||
|
||||
local pco = setmetatable({root=root}, {__index=_root_co})
|
||||
proc.data.coroutine_handler = pco
|
||||
|
||||
function pco.yield(...)
|
||||
return _root_co.yield(nil, ...)
|
||||
end
|
||||
function pco.yield_past(co, ...)
|
||||
return _root_co.yield(co, ...)
|
||||
end
|
||||
function pco.resume(co, ...)
|
||||
checkArg(1, co, "thread")
|
||||
local args = table.pack(...)
|
||||
while true do -- for consecutive sysyields
|
||||
local result = table.pack(_root_co.resume(co, table.unpack(args, 1, args.n)))
|
||||
local target = result[2] == true and pco.root or result[2]
|
||||
if not result[1] or _root_co.status(co) == "dead" then
|
||||
return table.unpack(result, 1, result.n)
|
||||
elseif target and target ~= co then
|
||||
args = table.pack(_root_co.yield(table.unpack(result, 2, result.n)))
|
||||
else
|
||||
return true, table.unpack(result, 3, result.n)
|
||||
end
|
||||
end
|
||||
end
|
||||
return pco
|
||||
end
|
||||
|
||||
local pipe_stream =
|
||||
{
|
||||
continue = function(self, exit)
|
||||
local result = table.pack(coroutine.resume(self.next))
|
||||
while true do -- repeat resumes if B (A|B) makes a natural yield
|
||||
-- if B crashed or closed in the last resume
|
||||
-- then we can close the stream
|
||||
if coroutine.status(self.next) == "dead" then
|
||||
self:close()
|
||||
-- always cause os.exit when the pipe closes
|
||||
-- this is very important
|
||||
-- e.g. cat very_large_file | head
|
||||
-- when head is done, cat should stop
|
||||
result[1] = nil
|
||||
end
|
||||
-- the pipe closed or crashed
|
||||
if not result[1] then
|
||||
if exit then
|
||||
os.exit(command_result_as_code(result[2]))
|
||||
end
|
||||
return self
|
||||
end
|
||||
-- next is suspended, read_mode indicates why
|
||||
if self.read_mode then
|
||||
-- B wants A to write again, resume A
|
||||
return self
|
||||
end
|
||||
-- not reading, it is requesting a yield
|
||||
-- yield_past(true) will exit this coroutine stack
|
||||
result = table.pack(coroutine.yield_past(true, table.unpack(result, 2, result.n)))
|
||||
result = table.pack(coroutine.resume(self.next, table.unpack(result, 1, result.n))) -- the request was for an event
|
||||
end
|
||||
end,
|
||||
close = function(self)
|
||||
self.closed = true
|
||||
if coroutine.status(self.next) == "suspended" then
|
||||
self:continue()
|
||||
end
|
||||
end,
|
||||
seek = function()
|
||||
return nil, "bad file descriptor"
|
||||
end,
|
||||
write = function(self, value)
|
||||
if self.closed then
|
||||
-- if next is dead, ignore all writes
|
||||
if coroutine.status(self.next) ~= "dead" then
|
||||
io.stderr:write("attempt to use a closed stream\n")
|
||||
os.exit(1)
|
||||
end
|
||||
else
|
||||
self.buffer = self.buffer .. value
|
||||
return self:continue(true)
|
||||
end
|
||||
os.exit(0) -- abort the current process: SIGPIPE
|
||||
end,
|
||||
read = function(self, n)
|
||||
if self.closed then
|
||||
return nil -- eof
|
||||
end
|
||||
if self.buffer == "" then
|
||||
-- the pipe_stream write resume is waiting on this process B (A|B) to yield
|
||||
-- yield here requests A to output again. However, B may elsewhere want a
|
||||
-- natural yield (i.e. for events). To differentiate this yield from natural
|
||||
-- yields we set read_mode here, which the pipe_stream write detects
|
||||
self.read_mode = true
|
||||
coroutine.yield_past(self.next) -- next is the first croutine in this stack
|
||||
self.read_mode = false
|
||||
end
|
||||
local result = string.sub(self.buffer, 1, n)
|
||||
self.buffer = string.sub(self.buffer, n + 1)
|
||||
return result
|
||||
end
|
||||
}
|
||||
|
||||
-- prog1 | prog2 | ... | progn
|
||||
function pipe.buildPipeChain(progs)
|
||||
local chain = {}
|
||||
local prev_piped_stream
|
||||
for i=1,#progs do
|
||||
local thread = progs[i]
|
||||
-- A needs to be a stack in case any thread in A call write and then B natural yields
|
||||
-- B needs to be a stack in case any thread in B calls read
|
||||
pipe.createCoroutineStack(thread)
|
||||
chain[i] = thread
|
||||
local proc = process.info(thread)
|
||||
local pio = proc.data.io
|
||||
|
||||
local piped_stream
|
||||
if i < #progs then
|
||||
local handle = setmetatable({buffer = ""}, {__index = pipe_stream})
|
||||
process.addHandle(handle, proc)
|
||||
piped_stream = buffer.new("rw", handle)
|
||||
piped_stream:setvbuf("no", 1024)
|
||||
pio[1] = piped_stream
|
||||
end
|
||||
|
||||
if prev_piped_stream then
|
||||
prev_piped_stream.stream.next = thread
|
||||
pio[0] = prev_piped_stream
|
||||
end
|
||||
|
||||
prev_piped_stream = piped_stream
|
||||
end
|
||||
|
||||
return chain
|
||||
end
|
||||
|
||||
local chain_stream =
|
||||
{
|
||||
read = function(self, value, ...)
|
||||
if self.io_stream.closed then return nil end
|
||||
-- wake up prog
|
||||
self.ready = false -- the pipe proc sets this true when ios completes
|
||||
local ret = table.pack(coroutine.resume(self.pco.root, value, ...))
|
||||
if coroutine.status(self.pco.root) == "dead" then
|
||||
return nil
|
||||
elseif not ret[1] then
|
||||
return table.unpack(ret, 1, ret.n)
|
||||
end
|
||||
if not self.ready then
|
||||
-- prog yielded back without writing/reading
|
||||
return self:read(coroutine.yield())
|
||||
end
|
||||
return ret[2]
|
||||
end,
|
||||
write = function(self, ...)
|
||||
return self:read(...)
|
||||
end,
|
||||
close = function(self)
|
||||
self.io_stream:close()
|
||||
end,
|
||||
}
|
||||
|
||||
function pipe.popen(prog, mode, env)
|
||||
mode = mode or "r"
|
||||
if mode ~= "r" and mode ~= "w" then
|
||||
return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w"
|
||||
end
|
||||
|
||||
local r = mode == "r"
|
||||
|
||||
local chain = {}
|
||||
-- to simplify the code - shell.execute is run within a function to pass (prog, env)
|
||||
-- if cmd_proc were to come second (mode=="w") then the pipe_proc would have to pass
|
||||
-- the starting args. which is possible, just more complicated
|
||||
local cmd_proc = process.load(function() return shell.execute(prog, env) end, nil, nil, prog)
|
||||
|
||||
-- the chain stream is the popen controller
|
||||
local stream = setmetatable({}, { __index = chain_stream })
|
||||
|
||||
-- the stream needs its own process for io
|
||||
local pipe_proc = process.load(function()
|
||||
local n = r and 0 or ""
|
||||
local key = r and "read" or "write"
|
||||
local ios = stream.io_stream
|
||||
while not ios.closed do
|
||||
-- read from pipe
|
||||
local ret = table.pack(ios[key](ios, n))
|
||||
stream.ready = true
|
||||
-- yield outside the chain now
|
||||
n = coroutine.yield_past(chain[1], table.unpack(ret, 1, ret.n))
|
||||
end
|
||||
end, nil, nil, "pipe_handler")
|
||||
|
||||
chain[r and 1 or 2] = cmd_proc
|
||||
chain[r and 2 or 1] = pipe_proc
|
||||
|
||||
-- link the cmd and pipe proc io
|
||||
pipe.buildPipeChain(chain)
|
||||
local cmd_data = process.info(chain[1]).data
|
||||
local cmd_stack = cmd_data.coroutine_handler
|
||||
|
||||
-- store handle to io_stream from easy access later
|
||||
stream.io_stream = cmd_data.io[1].stream
|
||||
stream.pco = cmd_stack
|
||||
|
||||
-- popen commands start out running, like threads
|
||||
cmd_stack.resume(cmd_stack.root)
|
||||
|
||||
local buffered_stream = buffer.new(mode, stream)
|
||||
buffered_stream:setvbuf("no", 1024)
|
||||
return buffered_stream
|
||||
end
|
||||
|
||||
return pipe
|
||||
Reference in New Issue
Block a user