Because I want to use this for work on Monday and I don’t feel like writing myself an email atm.
Optionally blocking writer (sender) and reader (receiver) via a named pipe in Python.
Feel free to use it – it’s hereby CC0-licensed.
import os
from time import sleep
import sys
Q_NAME = "a_q"
BLOCKING = sys.argv[1] == "-b" if len(sys.argv) > 1 else False
OPEN_FLAGS = os.O_WRONLY
if not BLOCKING:
OPEN_FLAGS |= os.O_NONBLOCK
print("writer start")
if not os.path.exists(Q_NAME):
os.mkfifo(Q_NAME)
for i in range(9999999999):
try:
q = os.open(Q_NAME, OPEN_FLAGS)
print(f"sending {i}")
os.write(q, f"test {i}\n".encode())
except OSError as e:
print(f"no listener, discarding {i}")
sleep(1)
import os
import sys
import select
from time import sleep
Q_NAME = "a_q"
BLOCKING = sys.argv[1] == "-b" if len(sys.argv) > 1 else False
OPEN_FLAGS = os.O_RDONLY
if not BLOCKING:
OPEN_FLAGS |= os.O_NONBLOCK
print("reader start")
if not os.path.exists(Q_NAME):
os.mkfifo(Q_NAME)
while True:
q = os.open(Q_NAME, OPEN_FLAGS)
while True:
rlist, _, _ = select.select([q], [], [], 2)
if rlist:
data = os.read(q, 1024).decode().strip()
if len(data) == 0:
print("stream closed")
break
else:
print(data)
else:
print("no input")
sleep(1)