Initial commit: agent-tui
Bubble Tea TUI for tailing Claude Code subagent JSONL logs. Features: - Tabbed interface with one tab per agent, sorted by most recent activity - Markdown rendering via glamour (Dracula theme) - Session discovery and filtering with -s flag - Auto-discovers subagents dir from session ID prefix - Live tailing with follow mode, mouse scroll support - Agent name resolution from team config files - Tail-from-bottom: only reads last 200 lines per file on startup
This commit is contained in:
334
watcher.go
Normal file
334
watcher.go
Normal file
@@ -0,0 +1,334 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
// EventMsg is sent to the Bubble Tea model when a new event is parsed.
|
||||
type EventMsg struct {
|
||||
Event Event
|
||||
File string
|
||||
}
|
||||
|
||||
// InitialEventsMsg is sent once with all tail events at startup.
|
||||
type InitialEventsMsg struct {
|
||||
Events []Event
|
||||
}
|
||||
|
||||
// NewFileMsg is sent when a new JSONL file is discovered.
|
||||
type NewFileMsg struct {
|
||||
File string
|
||||
}
|
||||
|
||||
// WatcherErrorMsg is sent when the watcher encounters an error.
|
||||
type WatcherErrorMsg struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
// Default number of lines to read from the end of each file on startup.
|
||||
const defaultTailLines = 200
|
||||
|
||||
// Watcher watches a directory for JSONL files and tails them.
|
||||
type Watcher struct {
|
||||
dir string
|
||||
eventCh chan EventMsg
|
||||
done chan struct{}
|
||||
|
||||
// Initial batch of events from tail, sent once
|
||||
initialEvents []Event
|
||||
initialDone chan struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
tailing map[string]struct{}
|
||||
fsw *fsnotify.Watcher
|
||||
}
|
||||
|
||||
// NewWatcher creates a new directory watcher.
|
||||
func NewWatcher(dir string) *Watcher {
|
||||
return &Watcher{
|
||||
dir: dir,
|
||||
eventCh: make(chan EventMsg, 256),
|
||||
done: make(chan struct{}),
|
||||
initialDone: make(chan struct{}),
|
||||
tailing: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins watching and tailing.
|
||||
func (w *Watcher) Start() error {
|
||||
var err error
|
||||
w.fsw, err = fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.fsw.Add(w.dir); err != nil {
|
||||
w.fsw.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Read tail of existing files synchronously to build initial batch
|
||||
matches, _ := filepath.Glob(filepath.Join(w.dir, "*.jsonl"))
|
||||
var allInitial []Event
|
||||
fileOffsets := make(map[string]int64) // track where we left off
|
||||
|
||||
for _, path := range matches {
|
||||
events, endOffset := readTail(path, defaultTailLines)
|
||||
allInitial = append(allInitial, events...)
|
||||
fileOffsets[path] = endOffset
|
||||
}
|
||||
|
||||
// Sort by timestamp
|
||||
sort.Slice(allInitial, func(i, j int) bool {
|
||||
return allInitial[i].parsed.time.Before(allInitial[j].parsed.time)
|
||||
})
|
||||
w.initialEvents = allInitial
|
||||
|
||||
// Now start tailing from where we left off (for new events only)
|
||||
for _, path := range matches {
|
||||
w.mu.Lock()
|
||||
w.tailing[path] = struct{}{}
|
||||
w.mu.Unlock()
|
||||
go w.tailFileFrom(path, fileOffsets[path])
|
||||
}
|
||||
|
||||
close(w.initialDone)
|
||||
|
||||
// Watch for new files
|
||||
go w.watchLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitialEvents returns the batch of events read from tails at startup.
|
||||
// Only valid after Start() returns.
|
||||
func (w *Watcher) InitialEvents() []Event {
|
||||
return w.initialEvents
|
||||
}
|
||||
|
||||
// readTail reads the last n lines from a JSONL file and returns parsed events
|
||||
// plus the byte offset at EOF (for continuing to tail from).
|
||||
func readTail(path string, n int) ([]Event, int64) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, 0
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
offset := findTailOffset(f, n)
|
||||
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
reader := bufio.NewReader(f)
|
||||
|
||||
// If we seeked to mid-file, skip the first partial line
|
||||
if offset > 0 {
|
||||
_, _ = reader.ReadString('\n')
|
||||
}
|
||||
|
||||
var events []Event
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var evt Event
|
||||
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
||||
continue
|
||||
}
|
||||
parseEvent(&evt)
|
||||
events = append(events, evt)
|
||||
}
|
||||
|
||||
// Get current file position (EOF)
|
||||
endOffset, err := f.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
endOffset = 0
|
||||
}
|
||||
|
||||
return events, endOffset
|
||||
}
|
||||
|
||||
func (w *Watcher) watchLoop() {
|
||||
defer w.fsw.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.done:
|
||||
return
|
||||
case event, ok := <-w.fsw.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if event.Op&(fsnotify.Create|fsnotify.Write) != 0 {
|
||||
if strings.HasSuffix(event.Name, ".jsonl") {
|
||||
w.startTail(event.Name)
|
||||
}
|
||||
}
|
||||
case _, ok := <-w.fsw.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Watcher) startTail(path string) {
|
||||
w.mu.Lock()
|
||||
if _, ok := w.tailing[path]; ok {
|
||||
w.mu.Unlock()
|
||||
return
|
||||
}
|
||||
w.tailing[path] = struct{}{}
|
||||
w.mu.Unlock()
|
||||
|
||||
// New file discovered after startup — tail from beginning
|
||||
go w.tailFileFrom(path, 0)
|
||||
}
|
||||
|
||||
// tailFileFrom tails a file starting from the given byte offset.
|
||||
func (w *Watcher) tailFileFrom(path string, startOffset int64) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if startOffset > 0 {
|
||||
if _, err := f.Seek(startOffset, io.SeekStart); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
basename := filepath.Base(path)
|
||||
reader := bufio.NewReader(f)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
w.waitForChange(path, f, reader)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var evt Event
|
||||
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
||||
continue
|
||||
}
|
||||
parseEvent(&evt)
|
||||
|
||||
select {
|
||||
case w.eventCh <- EventMsg{Event: evt, File: basename}:
|
||||
case <-w.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// findTailOffset returns a byte offset to start reading from to get
|
||||
// approximately the last n lines of the file.
|
||||
func findTailOffset(f *os.File, n int) int64 {
|
||||
info, err := f.Stat()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
size := info.Size()
|
||||
if size == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
bufSize := int64(8192)
|
||||
newlines := 0
|
||||
pos := size
|
||||
|
||||
buf := make([]byte, bufSize)
|
||||
for pos > 0 {
|
||||
readSize := bufSize
|
||||
if pos < readSize {
|
||||
readSize = pos
|
||||
}
|
||||
pos -= readSize
|
||||
|
||||
nRead, err := f.ReadAt(buf[:readSize], pos)
|
||||
if err != nil && err != io.EOF {
|
||||
return 0
|
||||
}
|
||||
|
||||
for i := nRead - 1; i >= 0; i-- {
|
||||
if buf[i] == '\n' {
|
||||
newlines++
|
||||
if newlines > n {
|
||||
return pos + int64(i) + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// waitForChange blocks until the file has new data.
|
||||
func (w *Watcher) waitForChange(path string, f *os.File, reader *bufio.Reader) {
|
||||
fw, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer fw.Close()
|
||||
|
||||
if err := fw.Add(path); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.done:
|
||||
return
|
||||
case evt, ok := <-fw.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if evt.Op&fsnotify.Write != 0 {
|
||||
return
|
||||
}
|
||||
case <-fw.Errors:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the watcher.
|
||||
func (w *Watcher) Stop() {
|
||||
close(w.done)
|
||||
}
|
||||
|
||||
// Events returns the channel that emits parsed events.
|
||||
func (w *Watcher) Events() <-chan EventMsg {
|
||||
return w.eventCh
|
||||
}
|
||||
Reference in New Issue
Block a user