Skip to content

Construct column set dynamicaly #1373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DbConnection, DbStore } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { Logger, logExecutionTimeV2 } from '@crowd/logging'
import {
DATA_SINK_WORKER_QUEUE_SETTINGS,
NodejsWorkerEmitter,
Expand Down Expand Up @@ -39,7 +39,11 @@ export class WorkerQueueReceiver extends SqsQueueReceiver {

switch (message.type) {
case DataSinkWorkerQueueMessageType.PROCESS_INTEGRATION_RESULT:
await service.processResult((message as ProcessIntegrationResultQueueMessage).resultId)
await logExecutionTimeV2(
() => service.processResult((message as ProcessIntegrationResultQueueMessage).resultId),
this.log,
'dataSinkService.processResult',
)
break
default:
throw new Error(`Unknown message type: ${message.type}`)
Expand Down
14 changes: 11 additions & 3 deletions services/apps/data_sink_worker/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,23 @@ export default class MemberRepository extends RepositoryBase<MemberRepository> {
}

public async update(id: string, tenantId: string, data: IDbMemberUpdateData): Promise<void> {
const keys = Object.keys(data)
keys.push('updatedAt')
// construct custom column set
const dynamicColumnSet = new this.dbInstance.helpers.ColumnSet(keys, {
table: {
table: 'members',
},
})

const prepared = RepositoryBase.prepare(
{
...data,
weakIdentities: JSON.stringify(data.weakIdentities || []),
updatedAt: new Date(),
},
this.updateMemberColumnSet,
dynamicColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, this.updateMemberColumnSet)
const query = this.dbInstance.helpers.update(prepared, dynamicColumnSet)

const condition = this.format('where id = $(id) and "tenantId" = $(tenantId)', {
id,
Expand Down
121 changes: 77 additions & 44 deletions services/apps/data_sink_worker/src/service/dataSink.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DbStore } from '@crowd/database'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import { Logger, LoggerBase, getChildLogger, logExecutionTimeV2 } from '@crowd/logging'
import {
IActivityData,
IMemberData,
Expand Down Expand Up @@ -49,7 +49,12 @@ export default class DataSinkService extends LoggerBase {
public async processResult(resultId: string): Promise<void> {
this.log.debug({ resultId }, 'Processing result.')

const resultInfo = await this.repo.getResultInfo(resultId)
const resultInfo = await logExecutionTimeV2(
() => this.repo.getResultInfo(resultId),
this.log,
'DataSinkRepo.getResultInfo',
)


if (!resultInfo) {
this.log.error({ resultId }, 'Result not found.')
Expand Down Expand Up @@ -95,11 +100,15 @@ export default class DataSinkService extends LoggerBase {
)
const activityData = data.data as IActivityData

await service.processActivity(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
activityData,
await logExecutionTimeV2(
() => service.processActivity(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
activityData,
),
this.log,
'ActivityService.processActivity',
)
break
}
Expand All @@ -113,11 +122,15 @@ export default class DataSinkService extends LoggerBase {
)
const memberData = data.data as IMemberData

await service.processMemberEnrich(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
memberData,
await logExecutionTimeV2(
() => service.processMemberEnrich(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
memberData,
),
this.log,
'MemberService.processMemberEnrich',
)
break
}
Expand All @@ -126,11 +139,15 @@ export default class DataSinkService extends LoggerBase {
const service = new OrganizationService(this.store, this.log)
const organizationData = data.data as IOrganization

await service.processOrganizationEnrich(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
organizationData,
await logExecutionTimeV2(
() => service.processOrganizationEnrich(
resultInfo.tenantId,
resultInfo.integrationId,
resultInfo.platform,
organizationData,
),
this.log,
'OrganizationService.processOrganizationEnrich',
)
break
}
Expand All @@ -139,40 +156,56 @@ export default class DataSinkService extends LoggerBase {
throw new Error(`Unknown result type: ${data.type}`)
}
}
await this.repo.deleteResult(resultId)
await logExecutionTimeV2(
() => this.repo.deleteResult(resultId),
this.log,
'DataSinkRepo.deleteResult',
)
} catch (err) {
this.log.error(err, 'Error processing result.')
await this.triggerResultError(
resultId,
'process-result',
'Error processing result.',
undefined,
err,
await logExecutionTimeV2(
() => this.triggerResultError(
resultId,
'process-result',
'Error processing result.',
undefined,
err,
),
this.log,
'DataSinkService.triggerResultError',
)

await sendSlackAlert({
slackURL: SLACK_ALERTING_CONFIG().url,
alertType: SlackAlertTypes.SINK_WORKER_ERROR,
integration: {
id: resultInfo.integrationId,
platform: resultInfo.platform,
tenantId: resultInfo.tenantId,
resultId: resultInfo.id,
apiDataId: resultInfo.apiDataId,
},
userContext: {
currentTenant: {
name: resultInfo.name,
plan: resultInfo.plan,
isTrial: resultInfo.isTrialPlan,
await logExecutionTimeV2(
() => sendSlackAlert({
slackURL: SLACK_ALERTING_CONFIG().url,
alertType: SlackAlertTypes.SINK_WORKER_ERROR,
integration: {
id: resultInfo.integrationId,
platform: resultInfo.platform,
tenantId: resultInfo.tenantId,
resultId: resultInfo.id,
apiDataId: resultInfo.apiDataId,
},
userContext: {
currentTenant: {
name: resultInfo.name,
plan: resultInfo.plan,
isTrial: resultInfo.isTrialPlan,
},
},
},
log: this.log,
frameworkVersion: 'new',
})
log: this.log,
frameworkVersion: 'new',
}),
this.log,
'DataSinkService -> sendSlackAlert',
)
} finally {
if (resultInfo.runId) {
await this.repo.touchRun(resultInfo.runId)
await logExecutionTimeV2(
() => this.repo.touchRun(resultInfo.runId),
this.log,
'DataSinkRepo.touchRun',
)
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,19 @@ export default class MemberService extends LoggerBase {

if (!isObjectEmpty(toUpdate)) {
this.log.debug({ memberId: id }, 'Updating a member!')
await txRepo.update(id, tenantId, {
emails: toUpdate.emails || original.emails,
joinedAt: toUpdate.joinedAt || original.joinedAt,
attributes: toUpdate.attributes || original.attributes,
weakIdentities: toUpdate.weakIdentities || original.weakIdentities,
// leave this one empty if nothing changed - we are only adding up new identities not removing them
identities: toUpdate.identities,
displayName: toUpdate.displayName || original.displayName,
})

const dateToUpdate = Object.entries(toUpdate).reduce((acc, [key, value]) => {
if (key === 'identities') {
return acc
}

if (value) {
acc[key] = value
}
return acc
}, {} as IDbMemberUpdateData)

await txRepo.update(id, tenantId, dateToUpdate)
await txRepo.addToSegment(id, tenantId, segmentId)

updated = true
Expand Down
23 changes: 23 additions & 0 deletions services/libs/logging/src/utility.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,26 @@ export const logExecutionTime = async <T>(
log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`)
}
}

export const logExecutionTimeV2 = async <T>(
process: () => Promise<T>,
log: Logger,
name: string,
): Promise<T> => {
const start = performance.now()

const end = () => {
const end = performance.now()
const duration = end - start
const durationInSeconds = duration / 1000
return durationInSeconds.toFixed(2)
}
try {
const result = await process()
log.info(`Process ${name} took ${end()} seconds!`)
return result
} catch (e) {
log.info(`Process ${name} failed after ${end()} seconds!`)
throw e
}
}