Skip to content

Commit

Permalink
chore: applying 1.2.4 hotfixes to main branch (#2597)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 20, 2022
1 parent 83ace48 commit 0e1da7e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 79 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## [1.2.4](https://github.com/rudderlabs/rudder-server/compare/v1.2.3...v1.2.4) (2022-10-19)


### Bug Fixes

* **warehouse:** default job type ([#2595](https://github.com/rudderlabs/rudder-server/issues/2595)) ([fd2652a](https://github.com/rudderlabs/rudder-server/commit/fd2652a0849c390879461baf520dc2cdb90dbb9e))

## [1.2.3](https://github.com/rudderlabs/rudder-server/compare/v1.2.2...v1.2.3) (2022-10-19)


### Bug Fixes

* **warehouse:** remove bad unlock ([#2590](https://github.com/rudderlabs/rudder-server/issues/2590)) ([aba7893](https://github.com/rudderlabs/rudder-server/commit/aba7893850dbb5ed224bc82bc19f1924b65874d2))

## [1.2.2](https://github.com/rudderlabs/rudder-server/compare/v1.2.1...v1.2.2) (2022-10-19)


Expand Down
164 changes: 85 additions & 79 deletions services/pgnotifier/pgnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ func (notifier PgNotifierT) ClearJobs(ctx context.Context) (err error) {
// additional safety check to not delete all jobs with empty workspaceIdentifier
if notifier.workspaceIdentifier != "" {
stmt := fmt.Sprintf(`
DELETE FROM
%s
WHERE
DELETE FROM
%s
WHERE
workspace = '%s';
`,
queueName,
Expand Down Expand Up @@ -205,13 +205,13 @@ func (notifier *PgNotifierT) trackUploadBatch(batchID string, ch *chan []Respons
// keep polling db for batch status
// or subscribe to triggers
stmt := fmt.Sprintf(`
SELECT
count(*)
FROM
%s
WHERE
batch_id = '%s'
AND status != '%s'
SELECT
count(*)
FROM
%s
WHERE
batch_id = '%s'
AND status != '%s'
AND status != '%s';
`,
queueName,
Expand All @@ -228,14 +228,14 @@ func (notifier *PgNotifierT) trackUploadBatch(batchID string, ch *chan []Respons

if count == 0 {
stmt = fmt.Sprintf(`
SELECT
payload -> 'StagingFileID',
payload -> 'Output',
status,
error
FROM
%s
WHERE
SELECT
payload -> 'StagingFileID',
payload -> 'Output',
status,
error
FROM
%s
WHERE
batch_id = '%s';
`,
queueName,
Expand Down Expand Up @@ -264,9 +264,9 @@ func (notifier *PgNotifierT) trackUploadBatch(batchID string, ch *chan []Respons
*ch <- responses
pkgLogger.Infof("PgNotifier: Completed processing all files in batch: %s", batchID)
stmt = fmt.Sprintf(`
DELETE FROM
%s
WHERE
DELETE FROM
%s
WHERE
batch_id = '%s';
`,
queueName,
Expand Down Expand Up @@ -344,20 +344,20 @@ func (notifier *PgNotifierT) UpdateClaimedEvent(claim *ClaimT, response *ClaimRe
if response.Err != nil {
pkgLogger.Error(response.Err.Error())
stmt := fmt.Sprintf(`
UPDATE
%[1]s
SET
UPDATE
%[1]s
SET
status =(
CASE WHEN attempt > %[2]d THEN CAST (
'%[3]s' AS pg_notifier_status_type
) ELSE CAST(
'%[4]s' AS pg_notifier_status_type
) END
),
attempt = attempt + 1,
updated_at = '%[5]s',
error = %[6]s
WHERE
),
attempt = attempt + 1,
updated_at = '%[5]s',
error = %[6]s
WHERE
id = %[7]v;
`,
queueName,
Expand All @@ -380,13 +380,13 @@ func (notifier *PgNotifierT) UpdateClaimedEvent(claim *ClaimT, response *ClaimRe
}
} else {
stmt := fmt.Sprintf(`
UPDATE
%[1]s
SET
status = '%[2]s',
updated_at = '%[3]s',
payload = $1
WHERE
UPDATE
%[1]s
SET
status = '%[2]s',
updated_at = '%[3]s',
payload = $1
WHERE
id = %[4]v;
`,
queueName,
Expand Down Expand Up @@ -416,37 +416,38 @@ func (notifier *PgNotifierT) claim(workerID string) (claim ClaimT, err error) {
}()
var claimedID int64
var attempt int
var batchID, status, workspace, job_type string
var batchID, status, workspace string
var jobType sql.NullString
var payload json.RawMessage
stmt := fmt.Sprintf(`
UPDATE
%[1]s
SET
status = '%[2]s',
updated_at = '%[3]s',
last_exec_time = '%[3]s',
worker_id = '%[4]v'
WHERE
UPDATE
%[1]s
SET
status = '%[2]s',
updated_at = '%[3]s',
last_exec_time = '%[3]s',
worker_id = '%[4]v'
WHERE
id = (
SELECT
id
FROM
%[1]s
WHERE
status = '%[5]s'
OR status = '%[6]s'
ORDER BY
priority ASC,
id ASC FOR
UPDATE
SKIP LOCKED
LIMIT
SELECT
id
FROM
%[1]s
WHERE
status = '%[5]s'
OR status = '%[6]s'
ORDER BY
priority ASC,
id ASC FOR
UPDATE
SKIP LOCKED
LIMIT
1
) RETURNING id,
batch_id,
status,
payload,
workspace,
) RETURNING id,
batch_id,
status,
payload,
workspace,
attempt,
job_type;
`,
Expand All @@ -462,7 +463,7 @@ func (notifier *PgNotifierT) claim(workerID string) (claim ClaimT, err error) {
if err != nil {
return
}
err = tx.QueryRow(stmt).Scan(&claimedID, &batchID, &status, &payload, &workspace, &attempt, &job_type)
err = tx.QueryRow(stmt).Scan(&claimedID, &batchID, &status, &payload, &workspace, &attempt, &jobType)
defer func() {
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
Expand All @@ -484,14 +485,19 @@ func (notifier *PgNotifierT) claim(workerID string) (claim ClaimT, err error) {
return
}

// fallback to upload if jobType is not valid
if !jobType.Valid {
jobType = sql.NullString{String: "upload", Valid: true}
}

claim = ClaimT{
ID: claimedID,
BatchID: batchID,
Status: status,
Payload: payload,
Attempt: attempt,
Workspace: workspace,
JobType: job_type,
JobType: jobType.String,
}
return claim, nil
}
Expand Down Expand Up @@ -668,21 +674,21 @@ func (notifier *PgNotifierT) RunMaintenanceWorker(ctx context.Context) error {
}
for {
stmt := fmt.Sprintf(`
UPDATE
%[1]s
SET
status = '%[3]s',
updated_at = '%[2]s'
WHERE
UPDATE
%[1]s
SET
status = '%[3]s',
updated_at = '%[2]s'
WHERE
id IN (
SELECT
id
FROM
%[1]s
WHERE
status = '%[4]s'
AND last_exec_time <= NOW() - INTERVAL '%[5]v seconds' FOR
UPDATE
SELECT
id
FROM
%[1]s
WHERE
status = '%[4]s'
AND last_exec_time <= NOW() - INTERVAL '%[5]v seconds' FOR
UPDATE
SKIP LOCKED
) RETURNING id;
`,
Expand Down

0 comments on commit 0e1da7e

Please sign in to comment.