Commit 74ff0d9e authored by Rahul A. Sutar's avatar Rahul A. Sutar
Browse files

Downloader

Multithreaded, interruptable and resumable downloader commit
parent 2c78f0e4
Branches
Tags
2 merge requests!10Downloader,!7Downloader
Showing with 629 additions and 0 deletions
package coreos
import "coreospackage/mkcldownload"
//DownloadFile method
func DownloadFile(downloadURL string, downloadLocation string, noOfThreads int) bool {
url, resHeader := mkcldownload.GetFinalurl(downloadURL)
var downloadStatus bool
if mkcldownload.AcceptRanges(resHeader) {
mkcldownload.NoOfFiles = noOfThreads
mkcldownload.DownloadLocation = downloadLocation
downloadStatus = mkcldownload.Download(url, mkcldownload.GetContentLength(resHeader))
return downloadStatus
}
downloadStatus = mkcldownload.DownloadSingle(url)
return downloadStatus
}
//ResumeFileDownload method
func ResumeFileDownload(downloadURL string, downloadLocation string) bool {
mkcldownload.DownloadLocation = downloadLocation
url, resHeader := mkcldownload.GetFinalurl(downloadURL)
var resumeStatus bool
if mkcldownload.AcceptRanges(resHeader) {
resumeStatus = mkcldownload.Resume(url, mkcldownload.GetContentLength(resHeader))
}
return resumeStatus
}
package mkcldownload
import (
"flag"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
)
var (
noOfFiles = &NoOfFiles
resume = NewResume
maxTryCount = MaxTryCount
timeout = Timeout
ovrdConnLimit = OvrdConnLimit
)
var (
// NoOfFiles = flag.Int("n", 10, "number of parallel connection")
NoOfFiles int
DownloadLocation string
NewResume = flag.Bool("r", false, "resume pending download")
MaxTryCount = flag.Int("m", 1, "maximum attempts to establish a connection")
Timeout = flag.Int("t", 900, "maximum time in seconds it will wait to establish a connection")
OvrdConnLimit = flag.Bool("N", false, "maximum connection is restricted to 20, to force more connection")
)
//Download downloads a file from a given url by creating parallel connection
func Download(url string, length int) bool {
partLength := length / *noOfFiles
filename := getFilenameFromURL(url)
filename = getFilename(filename)
if _, err := os.Stat(DownloadLocation + "/temp/" + filename + "_0"); err == nil {
log.Fatal("Downloading has already started, resume downloading.")
return false
}
if err := SetupLog(length, *noOfFiles); err != nil {
log.Fatal(err)
return false
}
for i := 0; i < *noOfFiles; i++ {
byteStart := partLength * (i)
byteEnd := byteStart + partLength
if i == *noOfFiles-1 {
byteEnd = length
}
os.MkdirAll(DownloadLocation+"/temp/", 0777)
createTempFile(DownloadLocation+"/temp/"+filename+"_"+strconv.Itoa(i), byteStart, byteEnd)
wg.Add(1)
go downloadPart(url, filename, i, byteStart, byteEnd)
}
wg.Wait()
FinishLog()
if !errorGoRoutine {
mergeFiles(filename, *noOfFiles)
clearFiles(filename, *noOfFiles)
log.Println("download successful")
return true
}
log.Println("download unsuccessful")
return false
}
//Resume resumes a interrupted download by creating same number of connection
func Resume(url string, length int) bool {
filename := getFilenameFromURL(url)
filename = getFilename(filename)
*noOfFiles = noOfExistingConnection(filename, length)
if *noOfFiles == 0 {
return false
}
partLength := length / *noOfFiles
if err := SetupResumeLog(filename, length, *noOfFiles); err != nil {
log.Fatal(err)
return false
}
for i := 0; i < *noOfFiles; i++ {
partFilename := DownloadLocation + "/temp/" + filename + "_" + strconv.Itoa(i)
if _, err := os.Stat(partFilename); err != nil {
byteStart := partLength * (i)
byteEnd := byteStart + partLength
if i == *noOfFiles-1 {
byteEnd = length
}
wg.Add(1)
go downloadPart(url, filename, i, byteStart, byteEnd)
} else {
byteStart, byteEnd := readHeader(partFilename)
if byteStart < byteEnd {
wg.Add(1)
go downloadPart(url, filename, i, byteStart, byteEnd)
}
}
}
wg.Wait()
FinishLog()
if !errorGoRoutine {
mergeFiles(filename, *noOfFiles)
clearFiles(filename, *noOfFiles)
log.Println("download successful")
} else {
log.Println("download unsuccessful")
}
return true
}
//DownloadSingle downloads a file from a given url by creating single connection
func DownloadSingle(url string) bool {
filename := getFilenameFromURL(url)
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
return false
}
reader, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
return false
}
err = ioutil.WriteFile(filename, reader, 0666)
if err != nil {
log.Fatal(err)
return false
}
return true
}
package mkcldownload
import (
"errors"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"
)
type httpResponse struct {
resp *http.Response
err error
}
//PACKETLENGTH is size of each packet in bytes
const PACKETLENGTH = 32000
var wg sync.WaitGroup
var errorGoRoutine bool
func downloadPacket(client *http.Client, req *http.Request, partFilename string, byteStart, byteEnd int) error {
c := make(chan httpResponse, 1)
go func() {
resp, err := client.Do(req)
httpResponse := httpResponse{resp, err}
c <- httpResponse
}()
select {
case httpResponse := <-c:
if err := handleResponse(httpResponse, partFilename, byteStart, byteEnd); err != nil {
return err
}
case <-time.After(time.Second * time.Duration(*timeout)):
err := errors.New("Manual time out as response not recieved")
return err
}
return nil
}
func handleResponse(httpResponse httpResponse, partFilename string, byteStart, byteEnd int) error {
if httpResponse.err != nil {
return httpResponse.err
}
defer httpResponse.resp.Body.Close()
reader, err := ioutil.ReadAll(httpResponse.resp.Body)
if err != nil {
return err
}
err = writeBytes(partFilename, reader, byteStart, byteEnd)
if err != nil {
return err
}
return nil
}
func downloadPacketWithRetry(client *http.Client, req *http.Request, partFilename string, byteStart, byteEnd int) error {
var err error
for i := 0; i < *maxTryCount; i++ {
err = downloadPacket(client, req, partFilename, byteStart, byteEnd)
if err == nil {
return nil
} else if err.Error() == "Manual time out as response not recieved" {
continue
} else {
return err
}
}
return err
}
func downloadPart(url, filename string, index, byteStart, byteEnd int) {
client := &http.Client{}
partFilename := filename + "_" + strconv.Itoa(index)
noofpacket := (byteEnd-byteStart+1)/PACKETLENGTH + 1
for i := 0; i < noofpacket; i++ {
packetStart := byteStart + i*PACKETLENGTH
packetEnd := packetStart + PACKETLENGTH
if i == noofpacket-1 {
packetEnd = byteEnd
}
rangeHeader := "bytes=" + strconv.Itoa(packetStart) + "-" + strconv.Itoa(packetEnd-1)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("Range", rangeHeader)
err := downloadPacketWithRetry(client, req, partFilename, byteStart, byteEnd)
if err != nil {
handleErrorInGoRoutine(index, err)
return
}
UpdateStat(index, packetStart, packetEnd)
}
wg.Done()
}
func handleErrorInGoRoutine(index int, err error) {
ReportErrorStat(index, err, *noOfFiles)
errorGoRoutine = true
wg.Done()
}
package mkcldownload
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"
)
func createTempFile(partFilename string, fileBegin, fileEnd int) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, int64(fileBegin)); err != nil {
log.Fatal(err)
}
if err := binary.Write(buf, binary.LittleEndian, int64(fileEnd)); err != nil {
log.Fatal(err)
}
if err := ioutil.WriteFile(partFilename, buf.Bytes(), 0666); err != nil {
log.Fatal(err)
}
}
func writeBytes(partFilename string, reader []byte, byteStart, byteEnd int) error {
if err := os.MkdirAll(DownloadLocation+"/temp/", 0777); err != nil {
return err
}
if _, err := os.Stat(DownloadLocation + "/temp/" + partFilename); err != nil {
createTempFile(DownloadLocation+"/temp/"+partFilename, byteStart, byteEnd)
}
file, err := os.OpenFile(DownloadLocation+"/temp/"+partFilename, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return err
}
defer file.Close()
if _, err = file.WriteString(string(reader)); err != nil {
return err
}
return nil
}
func readHeader(partFilename string) (int, int) {
reader, err := ioutil.ReadFile(partFilename)
if err != nil {
log.Fatal(err)
}
header := reader[:16]
byteStart := int(binary.LittleEndian.Uint64(header[0:8])) + len(reader) - 16
byteEnd := int(binary.LittleEndian.Uint64(header[8:16]))
return byteStart, byteEnd
}
func mergeFiles(filename string, count int) {
tempFilename := strconv.Itoa(time.Now().Nanosecond()) + "_" + filename
fmt.Println("temp file name : " + tempFilename)
fmt.Println("file name : " + filename)
for i := 0; i < count; i++ {
partFilename := DownloadLocation + "/temp/" + filename + "_" + strconv.Itoa(i)
file, err := os.OpenFile(DownloadLocation+"/"+tempFilename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader, err := ioutil.ReadFile(partFilename)
reader = reader[16:]
if err != nil {
log.Fatal(err)
}
if _, err = file.WriteString(string(reader)); err != nil {
log.Fatal(err)
}
}
os.Rename(DownloadLocation+"/"+tempFilename, DownloadLocation+"/"+filename)
}
func isDirEmpty(name string) (bool, error) {
f, err := os.Open(name)
if err != nil {
return false, err
}
defer f.Close()
_, err = f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err
}
func clearFiles(filename string, count int) {
for i := 0; i < count; i++ {
partFilename := DownloadLocation + "/temp/" + filename + "_" + strconv.Itoa(i)
os.Remove(partFilename)
}
empty, err := isDirEmpty(DownloadLocation + "/temp/")
if err != nil {
log.Fatal(err)
}
if empty {
os.Remove(DownloadLocation + "/temp/")
}
}
func noOfExistingConnection(filename string, length int) int {
existingFilename := DownloadLocation + "/temp/" + filename + "_0"
if _, err := os.Stat(existingFilename); err != nil {
log.Fatal("No file to resume downloading")
}
if _, err := os.Stat(existingFilename); err == nil {
reader, err := ioutil.ReadFile(existingFilename)
if err != nil {
log.Fatal(err)
}
if len(reader) < 16 {
log.Fatal("No file to resume downloading")
}
header := reader[:16]
interval := int(binary.LittleEndian.Uint64(header[8:16])) - int(binary.LittleEndian.Uint64(header[0:8]))
if interval == 0 {
log.Fatal("No file to resume downloading")
}
return (length / interval)
}
return 0
}
func getFilename(filename string) string {
j := 0
for j = 0; ; j++ {
if j == 1 {
filename += "(1)"
}
if (j != 0) && (j != 1) {
filename = strings.Replace(filename, "("+strconv.Itoa(j-1)+")", "("+strconv.Itoa(j)+")", 1)
}
if _, err := os.Stat(filename); os.IsNotExist(err) {
break
}
}
if j != 0 && j != 1 {
filename = strings.Replace(filename, "("+strconv.Itoa(j-1)+")", "("+strconv.Itoa(j)+")", 1)
}
return filename
}
package mkcldownload
import (
"encoding/binary"
"io/ioutil"
"log"
"os"
"strconv"
"github.com/cheggaaa/pb"
)
//ConnectionLog keeps log of all connection throgh progressbar
type ConnectionLog struct {
stats []ConnectionStat
pool *pb.Pool
totalbar *pb.ProgressBar
}
//ConnectionStat keeps statistic of each connection
type ConnectionStat struct {
connectionIndex int
pbar *pb.ProgressBar
Err error
}
var connLog ConnectionLog
//SetupLog sets up initial ConnectionLog
func SetupLog(length, noOfConn int) error {
connLog.stats = make([]ConnectionStat, noOfConn)
barArray := make([]*pb.ProgressBar, noOfConn+1)
lenSub := length / noOfConn
for i := 0; i < noOfConn; i++ {
fileBegin := lenSub * i
fileEnd := lenSub * (i + 1)
if i == noOfConn-1 {
fileEnd = length
}
bar := pb.New(fileEnd - fileBegin).Prefix("Connection " + strconv.Itoa(i+1) + " ")
customizeBar(bar)
connLog.stats[i] = ConnectionStat{connectionIndex: i, pbar: bar}
barArray[i] = bar
}
bar := pb.New(length).Prefix("Total ")
customizeBar(bar)
connLog.totalbar = bar
barArray[noOfConn] = bar
var err error
connLog.pool, err = pb.StartPool(barArray...)
if err != nil {
return err
}
return nil
}
func customizeBar(bar *pb.ProgressBar) {
bar.ShowCounters = true
bar.ShowTimeLeft = false
bar.ShowSpeed = true
bar.SetMaxWidth(80)
bar.SetUnits(pb.U_BYTES)
}
//SetupResumeLog sets up ConnectionLog for a resumed download
func SetupResumeLog(filename string, length, noOfConn int) error {
connLog.stats = make([]ConnectionStat, noOfConn)
barArray := make([]*pb.ProgressBar, noOfConn+1)
totalbar := pb.New(length).Prefix("Total ")
lenSub := length / noOfConn
for i := 0; i < noOfConn; i++ {
partFilename := DownloadLocation + "/temp/" + filename + "_" + strconv.Itoa(i)
if _, err := os.Stat(partFilename); err == nil {
reader, err := ioutil.ReadFile(partFilename)
if err != nil {
return err
}
header := reader[:16]
fileBegin := int(binary.LittleEndian.Uint64(header[0:8]))
fileEnd := int(binary.LittleEndian.Uint64(header[8:16]))
bar := pb.New(fileEnd - fileBegin).Prefix("Connection " + strconv.Itoa(i+1) + " ")
for j := 0; j < len(reader)-16; j++ {
bar.Increment()
totalbar.Increment()
}
customizeBar(bar)
connLog.stats[i] = ConnectionStat{connectionIndex: i, pbar: bar}
barArray[i] = bar
} else {
fileBegin := lenSub * i
fileEnd := lenSub * (i + 1)
if i == noOfConn-1 {
fileEnd = length
}
bar := pb.New(fileEnd - fileBegin).Prefix("Connection " + strconv.Itoa(i+1) + " ")
customizeBar(bar)
connLog.stats[i] = ConnectionStat{connectionIndex: i, pbar: bar}
barArray[i] = bar
}
}
customizeBar(totalbar)
connLog.totalbar = totalbar
barArray[noOfConn] = totalbar
var err error
connLog.pool, err = pb.StartPool(barArray...)
if err != nil {
return err
}
return nil
}
//UpdateStat updates statistic of a connection
func UpdateStat(i int, fileBegin int, fileEnd int) {
for j := fileBegin; j < fileEnd; j++ {
connLog.stats[i].pbar.Increment()
connLog.totalbar.Increment()
}
}
//FinishLog stops ConnectionLog pool
func FinishLog() {
connLog.pool.Stop()
}
//ReportErrorStat reports a log if an error occurs in a connection
func ReportErrorStat(i int, err error, noOfConn int) {
connLog.stats[i].Err = err
connLog.pool.Stop()
log.Println()
log.Println("Error in connection " + strconv.Itoa(i+1) + " : " + err.Error())
log.Println()
barArray := make([]*pb.ProgressBar, noOfConn+1)
for i := 0; i < noOfConn; i++ {
barArray[i] = connLog.stats[i].pbar
}
barArray[noOfConn] = connLog.totalbar
connLog.pool, _ = pb.StartPool(barArray...)
}
package mkcldownload
type ReturnStatus func()
package mkcldownload
import (
"flag"
"log"
"net/http"
"os"
"strconv"
"strings"
)
//AcceptRanges method
func AcceptRanges(m http.Header) bool {
for _, v := range m["Accept-Ranges"] {
if v == "bytes" {
return true
}
}
return false
}
func getFilenameFromURL(url string) string {
file := url[strings.LastIndex(url, "/")+1:]
if strings.Index(file, "?") != -1 {
return file[:strings.Index(file, "?")]
}
return file
}
//GetContentLength method
func GetContentLength(m http.Header) int {
length, _ := strconv.Atoi(m["Content-Length"][0])
return length
}
// GetFinalurl not sure
func GetFinalurl(url string) (string, http.Header) {
client := &http.Client{}
res, err := client.Head(url)
if err != nil {
log.Fatal(err)
}
responseURL := res.Request.URL.String()
if responseURL != url {
return GetFinalurl(responseURL)
}
return responseURL, res.Header
}
// ValidateFlags helps to ValidateFlags
func ValidateFlags() {
if *noOfFiles <= 0 || *maxTryCount <= 0 || *timeout <= 0 {
log.Println("Give a value greater than 0")
flag.Usage()
os.Exit(1)
}
if !(*ovrdConnLimit) {
if *noOfFiles > 20 {
log.Println("Connection limit restricted to 20, either use lower value or override using -N")
flag.Usage()
os.Exit(1)
}
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment