Bubble Tea TUI for tailing Claude Code subagent JSONL logs. Features: - Tabbed interface with one tab per agent, sorted by most recent activity - Markdown rendering via glamour (Dracula theme) - Session discovery and filtering with -s flag - Auto-discovers subagents dir from session ID prefix - Live tailing with follow mode, mouse scroll support - Agent name resolution from team config files - Tail-from-bottom: only reads last 200 lines per file on startup
335 lines
6.3 KiB
Go
335 lines
6.3 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// EventMsg is sent to the Bubble Tea model when a new event is parsed.
|
|
type EventMsg struct {
|
|
Event Event
|
|
File string
|
|
}
|
|
|
|
// InitialEventsMsg is sent once with all tail events at startup.
|
|
type InitialEventsMsg struct {
|
|
Events []Event
|
|
}
|
|
|
|
// NewFileMsg is sent when a new JSONL file is discovered.
|
|
type NewFileMsg struct {
|
|
File string
|
|
}
|
|
|
|
// WatcherErrorMsg is sent when the watcher encounters an error.
|
|
type WatcherErrorMsg struct {
|
|
Err error
|
|
}
|
|
|
|
// Default number of lines to read from the end of each file on startup.
|
|
const defaultTailLines = 200
|
|
|
|
// Watcher watches a directory for JSONL files and tails them.
|
|
type Watcher struct {
|
|
dir string
|
|
eventCh chan EventMsg
|
|
done chan struct{}
|
|
|
|
// Initial batch of events from tail, sent once
|
|
initialEvents []Event
|
|
initialDone chan struct{}
|
|
|
|
mu sync.Mutex
|
|
tailing map[string]struct{}
|
|
fsw *fsnotify.Watcher
|
|
}
|
|
|
|
// NewWatcher creates a new directory watcher.
|
|
func NewWatcher(dir string) *Watcher {
|
|
return &Watcher{
|
|
dir: dir,
|
|
eventCh: make(chan EventMsg, 256),
|
|
done: make(chan struct{}),
|
|
initialDone: make(chan struct{}),
|
|
tailing: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Start begins watching and tailing.
|
|
func (w *Watcher) Start() error {
|
|
var err error
|
|
w.fsw, err = fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := w.fsw.Add(w.dir); err != nil {
|
|
w.fsw.Close()
|
|
return err
|
|
}
|
|
|
|
// Read tail of existing files synchronously to build initial batch
|
|
matches, _ := filepath.Glob(filepath.Join(w.dir, "*.jsonl"))
|
|
var allInitial []Event
|
|
fileOffsets := make(map[string]int64) // track where we left off
|
|
|
|
for _, path := range matches {
|
|
events, endOffset := readTail(path, defaultTailLines)
|
|
allInitial = append(allInitial, events...)
|
|
fileOffsets[path] = endOffset
|
|
}
|
|
|
|
// Sort by timestamp
|
|
sort.Slice(allInitial, func(i, j int) bool {
|
|
return allInitial[i].parsed.time.Before(allInitial[j].parsed.time)
|
|
})
|
|
w.initialEvents = allInitial
|
|
|
|
// Now start tailing from where we left off (for new events only)
|
|
for _, path := range matches {
|
|
w.mu.Lock()
|
|
w.tailing[path] = struct{}{}
|
|
w.mu.Unlock()
|
|
go w.tailFileFrom(path, fileOffsets[path])
|
|
}
|
|
|
|
close(w.initialDone)
|
|
|
|
// Watch for new files
|
|
go w.watchLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
// InitialEvents returns the batch of events read from tails at startup.
|
|
// Only valid after Start() returns.
|
|
func (w *Watcher) InitialEvents() []Event {
|
|
return w.initialEvents
|
|
}
|
|
|
|
// readTail reads the last n lines from a JSONL file and returns parsed events
|
|
// plus the byte offset at EOF (for continuing to tail from).
|
|
func readTail(path string, n int) ([]Event, int64) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, 0
|
|
}
|
|
defer f.Close()
|
|
|
|
offset := findTailOffset(f, n)
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, 0
|
|
}
|
|
|
|
reader := bufio.NewReader(f)
|
|
|
|
// If we seeked to mid-file, skip the first partial line
|
|
if offset > 0 {
|
|
_, _ = reader.ReadString('\n')
|
|
}
|
|
|
|
var events []Event
|
|
for {
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
break
|
|
}
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
var evt Event
|
|
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
|
continue
|
|
}
|
|
parseEvent(&evt)
|
|
events = append(events, evt)
|
|
}
|
|
|
|
// Get current file position (EOF)
|
|
endOffset, err := f.Seek(0, io.SeekEnd)
|
|
if err != nil {
|
|
endOffset = 0
|
|
}
|
|
|
|
return events, endOffset
|
|
}
|
|
|
|
func (w *Watcher) watchLoop() {
|
|
defer w.fsw.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-w.done:
|
|
return
|
|
case event, ok := <-w.fsw.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
if event.Op&(fsnotify.Create|fsnotify.Write) != 0 {
|
|
if strings.HasSuffix(event.Name, ".jsonl") {
|
|
w.startTail(event.Name)
|
|
}
|
|
}
|
|
case _, ok := <-w.fsw.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) startTail(path string) {
|
|
w.mu.Lock()
|
|
if _, ok := w.tailing[path]; ok {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
w.tailing[path] = struct{}{}
|
|
w.mu.Unlock()
|
|
|
|
// New file discovered after startup — tail from beginning
|
|
go w.tailFileFrom(path, 0)
|
|
}
|
|
|
|
// tailFileFrom tails a file starting from the given byte offset.
|
|
func (w *Watcher) tailFileFrom(path string, startOffset int64) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
if startOffset > 0 {
|
|
if _, err := f.Seek(startOffset, io.SeekStart); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
basename := filepath.Base(path)
|
|
reader := bufio.NewReader(f)
|
|
|
|
for {
|
|
select {
|
|
case <-w.done:
|
|
return
|
|
default:
|
|
}
|
|
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
w.waitForChange(path, f, reader)
|
|
continue
|
|
}
|
|
return
|
|
}
|
|
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
var evt Event
|
|
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
|
continue
|
|
}
|
|
parseEvent(&evt)
|
|
|
|
select {
|
|
case w.eventCh <- EventMsg{Event: evt, File: basename}:
|
|
case <-w.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// findTailOffset returns a byte offset to start reading from to get
|
|
// approximately the last n lines of the file.
|
|
func findTailOffset(f *os.File, n int) int64 {
|
|
info, err := f.Stat()
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
size := info.Size()
|
|
if size == 0 {
|
|
return 0
|
|
}
|
|
|
|
bufSize := int64(8192)
|
|
newlines := 0
|
|
pos := size
|
|
|
|
buf := make([]byte, bufSize)
|
|
for pos > 0 {
|
|
readSize := bufSize
|
|
if pos < readSize {
|
|
readSize = pos
|
|
}
|
|
pos -= readSize
|
|
|
|
nRead, err := f.ReadAt(buf[:readSize], pos)
|
|
if err != nil && err != io.EOF {
|
|
return 0
|
|
}
|
|
|
|
for i := nRead - 1; i >= 0; i-- {
|
|
if buf[i] == '\n' {
|
|
newlines++
|
|
if newlines > n {
|
|
return pos + int64(i) + 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
// waitForChange blocks until the file has new data.
|
|
func (w *Watcher) waitForChange(path string, f *os.File, reader *bufio.Reader) {
|
|
fw, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer fw.Close()
|
|
|
|
if err := fw.Add(path); err != nil {
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-w.done:
|
|
return
|
|
case evt, ok := <-fw.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
if evt.Op&fsnotify.Write != 0 {
|
|
return
|
|
}
|
|
case <-fw.Errors:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the watcher.
|
|
func (w *Watcher) Stop() {
|
|
close(w.done)
|
|
}
|
|
|
|
// Events returns the channel that emits parsed events.
|
|
func (w *Watcher) Events() <-chan EventMsg {
|
|
return w.eventCh
|
|
}
|