77e69b9cf3
Signed-off-by: Olivier Gambier <olivier@docker.com>
368 lines
9.6 KiB
Go
368 lines
9.6 KiB
Go
// Copyright 2013 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"container/list"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
bigquery "google.golang.org/api/bigquery/v2"
|
|
storage "google.golang.org/api/storage/v1"
|
|
)
|
|
|
|
const (
|
|
GB = 1 << 30
|
|
MaxBackoff = 30000
|
|
BaseBackoff = 250
|
|
BackoffGrowthFactor = 1.8
|
|
BackoffGrowthDamper = 0.25
|
|
JobStatusDone = "DONE"
|
|
DatasetAlreadyExists = "Already Exists: Dataset"
|
|
TableWriteEmptyDisposition = "WRITE_EMPTY"
|
|
)
|
|
|
|
func init() {
|
|
scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
|
|
storage.DevstorageReadOnlyScope,
|
|
"https://www.googleapis.com/auth/userinfo.profile")
|
|
registerDemo("bigquery", scope, bqMain)
|
|
}
|
|
|
|
// This example demonstrates loading objects from Google Cloud Storage into
|
|
// BigQuery. Objects are specified by their bucket and a name prefix. Each
|
|
// object will be loaded into a new table identified by the object name minus
|
|
// any file extension. All tables are added to the specified dataset (one will
|
|
// be created if necessary). Currently, tables will not be overwritten and an
|
|
// attempt to load an object into a dataset that already contains its table
|
|
// will emit an error message indicating the table already exists.
|
|
// A schema file must be provided and it will be applied to every object/table.
|
|
// Example usage:
|
|
// go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
|
|
// myDataBucket datafile2013070 DataFiles2013
|
|
// ./datafile_schema.json 100
|
|
//
|
|
// This will load all objects (e.g. all data files from July 2013) from
|
|
// gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
|
|
// using the schema file provided and allowing up to 100 bad records. Assuming
|
|
// each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
|
|
// stored in the bucket, 9 tables will be created named like datafile201307DD
|
|
// where DD ranges from 01 to 09, inclusive.
|
|
// When the program completes, it will emit a results line similar to:
|
|
//
|
|
// 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
|
|
//
|
|
// The total elapsed time from the start of first job to the end of the last job
|
|
// (effectively wall clock time) is shown. In parenthesis is the aggregate time
|
|
// taken to load all tables.
|
|
func bqMain(client *http.Client, argv []string) {
|
|
if len(argv) != 6 {
|
|
fmt.Fprintln(os.Stderr,
|
|
"Usage: bq project_id bucket prefix dataset schema max_bad_records")
|
|
return
|
|
}
|
|
|
|
var (
|
|
project = argv[0]
|
|
bucket = argv[1]
|
|
objPrefix = argv[2]
|
|
datasetId = argv[3]
|
|
schemaFile = argv[4]
|
|
)
|
|
badRecords, err := strconv.ParseInt(argv[5], 10, 64)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
service, err := storage.New(client)
|
|
if err != nil {
|
|
log.Fatalf("Unable to create Storage service: %v", err)
|
|
}
|
|
|
|
// Get the list of objects in the bucket matching the specified prefix.
|
|
list := service.Objects.List(bucket)
|
|
list.Prefix(objPrefix)
|
|
objects, err := list.Do()
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
// Create the wrapper and insert the (new) dataset.
|
|
dataset, err := newBQDataset(client, project, datasetId)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
if err = dataset.insert(true); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
objectSource := &tableSource{
|
|
maxBadRecords: badRecords,
|
|
disposition: TableWriteEmptyDisposition,
|
|
}
|
|
|
|
// Load the schema from disk.
|
|
f, err := ioutil.ReadFile(schemaFile)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
if err = json.Unmarshal(f, &objectSource.schema); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
// Assumes all objects have .csv, .csv.gz (or no) extension.
|
|
tableIdFromObject := func(name string) string {
|
|
return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
|
|
}
|
|
|
|
// A jobset is way to group a collection of jobs together for monitoring.
|
|
// For this example, we just use the name of the bucket and object prefix.
|
|
jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
|
|
fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
|
|
|
|
// Load each object into a dataset of the same name (minus any extension).
|
|
// A successful insert call will inject the job into our queue for monitoring.
|
|
for _, o := range objects.Items {
|
|
objectSource.id = tableIdFromObject(o.Name)
|
|
objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
|
|
if err = dataset.load(jobset, objectSource); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
}
|
|
}
|
|
|
|
dataset.monitor(jobset)
|
|
}
|
|
|
|
// Wraps the BigQuery service and dataset and provides some helper functions.
|
|
type bqDataset struct {
|
|
project string
|
|
id string
|
|
bq *bigquery.Service
|
|
dataset *bigquery.Dataset
|
|
jobsets map[string]*list.List
|
|
}
|
|
|
|
func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
|
|
error) {
|
|
|
|
service, err := bigquery.New(client)
|
|
if err != nil {
|
|
log.Fatalf("Unable to create BigQuery service: %v", err)
|
|
}
|
|
|
|
return &bqDataset{
|
|
project: dsProj,
|
|
id: dsId,
|
|
bq: service,
|
|
dataset: &bigquery.Dataset{
|
|
DatasetReference: &bigquery.DatasetReference{
|
|
DatasetId: dsId,
|
|
ProjectId: dsProj,
|
|
},
|
|
},
|
|
jobsets: make(map[string]*list.List),
|
|
}, nil
|
|
}
|
|
|
|
func (ds *bqDataset) insert(existsOK bool) error {
|
|
call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
|
|
_, err := call.Do()
|
|
if err != nil && (!existsOK || !strings.Contains(err.Error(),
|
|
DatasetAlreadyExists)) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type tableSource struct {
|
|
id string
|
|
uri string
|
|
schema bigquery.TableSchema
|
|
maxBadRecords int64
|
|
disposition string
|
|
}
|
|
|
|
func (ds *bqDataset) load(jobset string, source *tableSource) error {
|
|
job := &bigquery.Job{
|
|
Configuration: &bigquery.JobConfiguration{
|
|
Load: &bigquery.JobConfigurationLoad{
|
|
DestinationTable: &bigquery.TableReference{
|
|
DatasetId: ds.dataset.DatasetReference.DatasetId,
|
|
ProjectId: ds.project,
|
|
TableId: source.id,
|
|
},
|
|
MaxBadRecords: source.maxBadRecords,
|
|
Schema: &source.schema,
|
|
SourceUris: []string{source.uri},
|
|
WriteDisposition: source.disposition,
|
|
},
|
|
},
|
|
}
|
|
|
|
call := ds.bq.Jobs.Insert(ds.project, job)
|
|
job, err := call.Do()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, ok := ds.jobsets[jobset]
|
|
if !ok {
|
|
ds.jobsets[jobset] = list.New()
|
|
}
|
|
ds.jobsets[jobset].PushBack(job)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
|
|
return ds.bq.Jobs.Get(ds.project, id).Do()
|
|
}
|
|
|
|
func (ds *bqDataset) monitor(jobset string) {
|
|
jobq, ok := ds.jobsets[jobset]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var backoff float64 = BaseBackoff
|
|
pause := func(grow bool) {
|
|
if grow {
|
|
backoff *= BackoffGrowthFactor
|
|
backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
|
|
backoff = math.Min(backoff, MaxBackoff)
|
|
fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
|
|
1+jobq.Len())
|
|
}
|
|
time.Sleep(time.Duration(backoff) * time.Millisecond)
|
|
}
|
|
var stats jobStats
|
|
|
|
// Track a 'head' pending job in queue for detecting cycling.
|
|
head := ""
|
|
// Loop until all jobs are done - with either success or error.
|
|
for jobq.Len() > 0 {
|
|
jel := jobq.Front()
|
|
job := jel.Value.(*bigquery.Job)
|
|
jobq.Remove(jel)
|
|
jid := job.JobReference.JobId
|
|
loop := false
|
|
|
|
// Check and possibly pick a new head job id.
|
|
if len(head) == 0 {
|
|
head = jid
|
|
} else {
|
|
if jid == head {
|
|
loop = true
|
|
}
|
|
}
|
|
|
|
// Retrieve the job's current status.
|
|
pause(loop)
|
|
j, err := ds.getJob(jid)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
// In this case of a transient API error, we want keep the job.
|
|
if j == nil {
|
|
jobq.PushBack(job)
|
|
} else {
|
|
// Must reset head tracker if job is discarded.
|
|
if loop {
|
|
head = ""
|
|
backoff = BaseBackoff
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Reassign with the updated job data (from Get).
|
|
// We don't use j here as Get might return nil for this value.
|
|
job = j
|
|
|
|
if job.Status.State != JobStatusDone {
|
|
jobq.PushBack(job)
|
|
continue
|
|
}
|
|
|
|
if res := job.Status.ErrorResult; res != nil {
|
|
fmt.Fprintln(os.Stderr, res.Message)
|
|
} else {
|
|
stat := job.Statistics
|
|
lstat := stat.Load
|
|
stats.files += 1
|
|
stats.bytesIn += lstat.InputFileBytes
|
|
stats.bytesOut += lstat.OutputBytes
|
|
stats.rows += lstat.OutputRows
|
|
stats.elapsed +=
|
|
time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
|
|
|
|
if stats.start.IsZero() {
|
|
stats.start = time.Unix(stat.StartTime/1000, 0)
|
|
} else {
|
|
t := time.Unix(stat.StartTime/1000, 0)
|
|
if stats.start.Sub(t) > 0 {
|
|
stats.start = t
|
|
}
|
|
}
|
|
|
|
if stats.finish.IsZero() {
|
|
stats.finish = time.Unix(stat.EndTime/1000, 0)
|
|
} else {
|
|
t := time.Unix(stat.EndTime/1000, 0)
|
|
if t.Sub(stats.finish) > 0 {
|
|
stats.finish = t
|
|
}
|
|
}
|
|
}
|
|
// When the head job is processed reset the backoff since the loads
|
|
// run in BQ in parallel.
|
|
if loop {
|
|
head = ""
|
|
backoff = BaseBackoff
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(os.Stderr, "%#v\n", stats)
|
|
}
|
|
|
|
type jobStats struct {
|
|
// Number of files (sources) loaded.
|
|
files int64
|
|
// Bytes read from source (possibly compressed).
|
|
bytesIn int64
|
|
// Bytes loaded into BigQuery (uncompressed).
|
|
bytesOut int64
|
|
// Rows loaded into BigQuery.
|
|
rows int64
|
|
// Time taken to load source into table.
|
|
elapsed time.Duration
|
|
// Start time of the job.
|
|
start time.Time
|
|
// End time of the job.
|
|
finish time.Time
|
|
}
|
|
|
|
func (s jobStats) GoString() string {
|
|
return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
|
|
s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
|
|
s.rows)
|
|
}
|