package main import ( "bytes" "encoding/xml" "fmt" "io" "log" "os" "os/exec" "runtime" "sync" "github.com/pkg/errors" ) const dumpFile = "/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml.bz2" const chunkSize = 10 * 1024 * 1024 // 10MB const initialWindow = 1024 * 1024 // 1MB const maxChunks = 1000 var bufPool = &sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, } var numWorkers = runtime.NumCPU() var startPageTag = []byte("") var endPageTag = []byte("") type Page struct { ID uint64 `xml:"id"` Title string `xml:"title"` } func consume(r io.Reader) error { xmlDecoder := xml.NewDecoder(r) var p Page for { t, err := xmlDecoder.Token() if err != nil { if err == io.EOF { return nil } return errors.WithStack(err) } if se, ok := t.(xml.StartElement); ok && se.Name.Local == "page" { if err := xmlDecoder.DecodeElement(&p, &se); err != nil { return errors.WithStack(err) } fmt.Println(p) } } } func worker(wg *sync.WaitGroup, work chan *bytes.Buffer) { defer wg.Done() for buf := range work { if err := consume(buf); err != nil { log.Printf("%+v\n", err) } buf.Reset() bufPool.Put(buf) } } func run() error { cmd := exec.Command("pbzcat", dumpFile) cmd.Stderr = os.Stderr bzr, err := cmd.StdoutPipe() if err != nil { return errors.WithStack(err) } defer bzr.Close() if err := cmd.Start(); err != nil { return err } defer cmd.Process.Kill() work := make(chan *bytes.Buffer) var wg sync.WaitGroup wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { go worker(&wg, work) } leftover := make([]byte, initialWindow) n, err := io.ReadAtLeast(bzr, leftover, initialWindow) if err != nil { return errors.WithStack(err) } start := bytes.Index(leftover, startPageTag) if start == -1 { return errors.New("too small initial size?") } leftover = leftover[start:n] total := int64(n) chunks := 0 defer func() { log.Println("bytes read", total) log.Println("chunks processed", chunks) }() for { buf := bufPool.Get().(*bytes.Buffer) buf.Grow(chunkSize) buf.Write(leftover) pending := chunkSize - int64(len(leftover)) leftover = leftover[0:0] n, err := io.CopyN(buf, bzr, pending) if err != nil { log.Println("on error copied", n) log.Println("on error pending", pending) return errors.WithStack(err) } total += n contents := buf.Bytes() index := bytes.LastIndex(contents, endPageTag) if index == -1 { return errors.New("chunk size too small?") } index += len(endPageTag) leftover = append(leftover, contents[index:]...) leftover = leftover[:buf.Len()-index] buf.Truncate(index) work <- buf chunks += 1 if chunks == maxChunks { break } } close(work) wg.Wait() return nil } func main() { log.SetOutput(os.Stderr) log.SetPrefix(">> ") log.SetFlags(0) if err := run(); err != nil { fmt.Fprintf(os.Stderr, "%+v\n", err) os.Exit(1) } }