Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
16013ce
Added a new extraction pipeline for AFUS and updated some of the filt…
quazi-h Dec 7, 2021
99391a4
Added the afus_owner schema to the project
quazi-h Dec 8, 2021
7001a74
Updates to AFUS extraction, updating tests.
quazi-h Dec 9, 2021
3e6547b
Changes to AFUS Extraction, TESTING
quazi-h Dec 13, 2021
e53ac45
Adding "baseline_arm_1" arm to list of RedCap events to pull from.
quazi-h Dec 14, 2021
c9e167a
New AfusTransformation pipeline added, logic to process the afus_owne…
quazi-h Dec 14, 2021
9baa062
Trimming whitespace from fields that are to be stripped.
quazi-h Dec 22, 2021
827ca0c
"Using the correct method to pull owner_id, no longer need to cast th…
quazi-h Dec 22, 2021
3645a23
Removing extraction changes from this branch (branched from extractio…
quazi-h Dec 22, 2021
91a3e1d
Merge branch 'master' into qh-dspdc-1958-afus-owner-transformations
quazi-h Dec 22, 2021
ed20d9a
Added forms needed for afus_dog extraction.
quazi-h Dec 22, 2021
326b360
Remaining fragments and transformation scripts for afus dog tables.
quazi-h Jan 19, 2022
a690954
Decreased extract batch size to 1, using a batch size of 5-10 caused …
quazi-h Jan 19, 2022
f103a76
Removed an unnecessary link in afus_owner table.
quazi-h Jan 19, 2022
1bcb9c4
Merge branch 'qh-dspdc-1958-afus-owner-transformations' into qh-dspdc…
quazi-h Jan 19, 2022
0504a3c
Merge branch 'master' into qh-dspdc-1958-afus-dog
quazi-h Jan 20, 2022
f7c92d8
Merge branch 'master' into qh-dspdc-1958-afus-dog
quazi-h Jan 20, 2022
01cb320
Some changes to address issues that came up when reviewing the data
quazi-h Jan 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ object AfusExtractionPipeline extends ScioApp[Args] {
"followup_status",
"followup_owner_contact",
"study_status",
"followup_owner_demographics"
"followup_owner_demographics",
"followup_physical_activity",
"followup_environment",
"followup_behavior",
"followup_diet",
"followup_meds_and_preventives",
"followup_canine_eating_behavior_dora",
"followup_dogowner_relationship_survey_mdors",
"followup_health_status"
)

def extractionFiltersGenerator(args: Args): List[FilterDirective] = {
Expand Down Expand Up @@ -91,7 +99,7 @@ object AfusExtractionPipeline extends ScioApp[Args] {
extractionArmsGenerator,
fieldList,
subdir,
10,
1,
RedCapClient.apply(_: List[String], wrapper)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.broadinstitute.monster.dap.afus

import org.broadinstitute.monster.dap.afus.AfusTransformationPipelineBuilder.logger
import org.broadinstitute.monster.dap.common.{TransformationError, MissingOwnerIdError, RawRecord}
import org.broadinstitute.monster.dogaging.jadeschema.table.AfusDog

object AfusDogTransformations {

def mapAfusDog(rawRecord: RawRecord): Option[AfusDog] = {
rawRecord.getOptionalNumber("st_owner_id") match {
case None =>
MissingOwnerIdError(s"Record has less than 1 value for field st_owner_id").log
None
case Some(owner_id) =>
try {
val completeDate = rawRecord.getOptionalDate("fu_complete_date")
val redcapEventName = rawRecord.getOptional("redcap_event_name")
Some(
AfusDog(
dogId = rawRecord.id,
ownerId = owner_id,
ownerIsVipOrStaff = rawRecord.getOptionalNumber("st_vip_or_staff"),
afusCompleteDate = completeDate,
afusCalendarYear = completeDate match {
case Some(date) => Some(date.getYear.toLong)
case None => None
},
// todo: redcap_event_name is not available with the current grouping of records (see TransformationHelper)
// todo: figure out how to do the correct grouping of records
afusFollowupYear = redcapEventName match {
case Some(event) => Some(event.split("_")(1).toLong)
case None => None
},
//Some(redcapEvent.split("_")(1).filter(_.isDigit).toLong),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these commented out?

//Some(redcapEvent.split("_").redcapEventName(0).filter(_.isDigit)afus_dd_alternate_recent_residence_count
afusDogDemographics = Some(DemographicsTransformations.mapDemographics(rawRecord)),
afusDogPhysicalActivity =
Some(PhysicalActivityTransformations.mapPhysicalActivities(rawRecord)),
afusDogEnvironment = Some(EnvironmentTransformations.mapEnvironment(rawRecord)),
afusDogBehavior = Some(BehaviorTransformations.mapBehavior(rawRecord)),
afusDogDiet = Some(DietTransformations.mapDiet(rawRecord)),
afusDogMedsPreventatives =
Some(MedsPreventativesTransformations.mapMedsPreventatives(rawRecord)),
afusDogDora = Some(DoraTransformations.mapDora(rawRecord)),
afusDogMdors = Some(MdorsTransformations.mapMdors(rawRecord)),
afusDogHealthStatus = Some(HealthStatusTransformations.mapHealthStatus(rawRecord))
)
)
} catch {
case e: TransformationError =>
e.log
None
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.broadinstitute.monster.dap.afus

import org.broadinstitute.monster.dap.afus.AfusTransformationPipelineBuilder.logger
import org.broadinstitute.monster.dap.common.{TransformationError, MissingOwnerIdError, RawRecord}
import org.broadinstitute.monster.dogaging.jadeschema.table.AfusOwner

object AfusOwnerTransformations {

def mapAfusOwnerData(rawRecord: RawRecord): Option[AfusOwner] = {
rawRecord.getOptionalNumber("st_owner_id") match {
case None =>
MissingOwnerIdError(s"Record has less than 1 value for field st_owner_id").log
None
case Some(owner_id) =>
try {
Some(
AfusOwner(
ownerId = owner_id,
afusOcHouseholdPersonCount = rawRecord.getOptionalNumber("fu_oc_people_household"),
afusOcHouseholdAdultCount = rawRecord.getOptionalNumber("fu_oc_adults_household"),
afusOcHouseholdChildCount = rawRecord.getOptionalNumber("fu_oc_children_household"),
afusOcPrimaryAddressChange = rawRecord.getOptionalBoolean("fu_oc_address_change"),
afusOcPrimaryAddressChangeDate =
rawRecord.getOptionalDate("fu_oc_address1_change_date"),
afusOcPrimaryResidenceOwnership = rawRecord.getOptionalNumber("fu_oc_address1_own"),
afusOcPrimaryResidenceOwnershipOtherDescription =
rawRecord.getOptionalStripped("fu_oc_address1_own_other"),
afusOcPrimaryResidenceState = rawRecord.getOptionalStripped("fu_oc_address1_state"),
afusOcPrimaryResidenceCensusDivision =
rawRecord.getOptionalNumber("fu_oc_address1_division"),
afusOcPrimaryResidenceTimePercentage =
rawRecord.getOptionalNumber("fu_oc_address1_pct"),
afusOcSecondaryResidence = rawRecord.getOptionalNumber("fu_oc_address2_yn"),
afusOcSecondaryAddressChange = rawRecord.getOptionalBoolean("fu_oc_address2_change"),
afusOcSecondaryResidenceChangeDate =
rawRecord.getOptionalDate("fu_oc_address2_change_date"),
afusOcSecondaryResidenceOwnership = rawRecord.getOptionalNumber("fu_oc_address2_own"),
afusOcSecondaryResidenceOwnershipOtherDescription =
rawRecord.getOptionalStripped("fu_oc_address2_own_other"),
afusOcSecondaryResidenceState = rawRecord.getOptionalStripped("fu_oc_address2_state"),
afusOcSecondaryResidenceTimePercentage =
rawRecord.getOptionalNumber("fu_oc_2nd_address_pct"),
afusOdAgeRangeYears = rawRecord.getOptionalNumber("fu_od_age"),
afusOdMaxEducation = rawRecord.getOptionalNumber("fu_od_education"),
afusOdMaxEducationOtherDescription =
rawRecord.getOptionalStripped("fu_od_education_other"),
afusOdAnnualIncomeRangeUsd = rawRecord.getOptionalNumber("fu_od_income")
)
)
} catch {
case e: TransformationError =>
e.log
None
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.broadinstitute.monster.dap.afus

import com.spotify.scio.ScioResult
import org.broadinstitute.monster.common.{PipelineBuilder, ScioApp}
import org.broadinstitute.monster.dap.common.{Args, PostProcess}

/** Entry-point for the CSLB transformation pipeline. */
object AfusTransformationPipeline extends ScioApp[Args] {

override def pipelineBuilder: PipelineBuilder[Args] = AfusTransformationPipelineBuilder
override def postProcess: ScioResult => Unit = PostProcess.postProcess
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.broadinstitute.monster.dap.afus

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import org.broadinstitute.monster.common.{PipelineBuilder, StorageIO}
import org.broadinstitute.monster.dap.common._
import org.slf4j.{Logger, LoggerFactory}

object AfusTransformationPipelineBuilder extends PipelineBuilder[Args] {
/**
* Schedule all the steps for the Dog Aging transformation in the given pipeline context.
*
* Scheduled steps are launched against the context's runner when the `run()` method
* is called on it.
*
* Adding implicit logger so we can associate it with the PipelineBuilder object
*/
implicit val logger: Logger = LoggerFactory.getLogger(getClass)

override def buildPipeline(ctx: ScioContext, args: Args): Unit = {
val rawRecords = readRecords(ctx, args)

val owners =
rawRecords.transform("Map AFUS Owners")(_.flatMap(AfusOwnerTransformations.mapAfusOwnerData))
val dogs =
rawRecords.transform("Map AFUS Dogs")(_.flatMap(AfusDogTransformations.mapAfusDog))

StorageIO.writeJsonLists(owners, "AFUS Owners", s"${args.outputPrefix}/afus_owner")
StorageIO.writeJsonLists(dogs, "AFUS Dogs", s"${args.outputPrefix}/afus_dog")
()
}

/** Read in records and group by study Id, with field name subgroups. */
def readRecords(ctx: ScioContext, args: Args): SCollection[RawRecord] = {
TransformationHelper.readRecordsGroupByStudyId(ctx, args.inputPrefix)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.broadinstitute.monster.dap.afus

import org.broadinstitute.monster.dap.common.RawRecord
import org.broadinstitute.monster.dogaging.jadeschema.fragment.AfusDogBehavior

object BehaviorTransformations {

def mapBehavior(rawRecord: RawRecord): AfusDogBehavior = {
val init = AfusDogBehavior.init()

val transformations = List(
mapBehaviors _
)

transformations.foldLeft(init)((acc, f) => f(rawRecord, acc))
}

def mapBehaviors(rawRecord: RawRecord, dog: AfusDogBehavior): AfusDogBehavior = {
dog.copy(
afusDbExcitementLevelBeforeWalk = rawRecord.getOptionalNumber("fu_db_e_before_walk"),
afusDbExcitementLevelBeforeCarRide = rawRecord.getOptionalNumber("fu_db_e_before_car"),
afusDbAggressionLevelOnLeashUnknownHuman =
rawRecord.getOptionalNumber("fu_db_a_approach_walk"),
afusDbAggressionLevelToysTakenAway = rawRecord.getOptionalNumber("fu_db_a_toy_family"),
afusDbAggressionLevelApproachedWhileEating =
rawRecord.getOptionalNumber("fu_db_a_food_approach_family"),
afusDbAggressionLevelDeliveryWorkersAtHome = rawRecord.getOptionalNumber("fu_db_a_delivery"),
afusDbAggressionLevelFoodTakenAway = rawRecord.getOptionalNumber("fu_db_a_food_take_family"),
afusDbAggressionLevelOnLeashUnknownDog = rawRecord.getOptionalNumber("fu_db_a_dog_approach"),
afusDbAggressionLevelUnknownHumanNearYard =
rawRecord.getOptionalNumber("fu_db_a_stranger_yard"),
afusDbAggressionLevelUnknownAggressiveDog =
rawRecord.getOptionalNumber("fu_db_a_dog_growled_at"),
afusDbAggressionLevelFamiliarDogWhileEating =
rawRecord.getOptionalNumber("fu_db_a_food_familiar_dog"),
afusDbAggressionLevelFamiliarDogWhilePlaying =
rawRecord.getOptionalNumber("fu_db_a_toy_familiar_dog"),
afusDbFearLevelUnknownHumanAwayFromHome =
rawRecord.getOptionalNumber("fu_db_f_approach_stranger"),
afusDbFearLevelLoudNoises = rawRecord.getOptionalNumber("fu_db_f_noise"),
afusDbFearLevelUnknownHumanTouch = rawRecord.getOptionalNumber("fu_db_f_unfamiliar_touch"),
afusDbFearLevelUnknownObjectsOutside = rawRecord.getOptionalNumber("fu_db_f_strange_objects"),
afusDbFearLevelUnknownDogs = rawRecord.getOptionalNumber("fu_db_f_strange_dog"),
afusDbFearLevelUnknownSituations = rawRecord.getOptionalNumber("fu_db_f_strange_situations"),
afusDbFearLevelUnknownAggressiveDog =
rawRecord.getOptionalNumber("fu_db_f_strange_dog_growl"),
afusDbFearLevelNailsClippedAtHome = rawRecord.getOptionalNumber("fu_db_f_nails_family"),
afusDbFearLevelBathedAtHome = rawRecord.getOptionalNumber("fu_db_f_bath_family"),
afusDbLeftAloneRestlessnessFrequency = rawRecord.getOptionalNumber("fu_db_s_agitation"),
afusDbLeftAloneBarkingFrequency = rawRecord.getOptionalNumber("fu_db_s_bark"),
afusDbLeftAloneScratchingFrequency = rawRecord.getOptionalNumber("fu_db_s_scratching"),
afusDbAttentionSeekingFollowsHumansFrequency = rawRecord.getOptionalNumber("fu_db_at_follow"),
afusDbAttentionSeekingSitsCloseToHumansFrequency =
rawRecord.getOptionalNumber("fu_db_at_sit_close"),
afusDbTrainingObeysSitCommandFrequency = rawRecord.getOptionalNumber("fu_db_td_sit"),
afusDbTrainingObeysStayCommandFrequency = rawRecord.getOptionalNumber("fu_db_td_stay"),
afusDbTrainingDistractionFrequency = rawRecord.getOptionalNumber("fu_db_td_distracted"),
afusDbChasesBirdsFrequency = rawRecord.getOptionalNumber("fu_db_m_chase_bird"),
afusDbChasesSquirrelsFrequency = rawRecord.getOptionalNumber("fu_db_m_chase_squirrel"),
afusDbEscapesHomeOrPropertyFrequency = rawRecord.getOptionalNumber("fu_db_m_escape"),
afusDbChewsInappropriateObjectsFrequency = rawRecord.getOptionalNumber("fu_db_m_chew"),
afusDbPullsLeashFrequency = rawRecord.getOptionalNumber("fu_db_m_pull"),
afusDbUrinatesInHomeFrequency = rawRecord.getOptionalNumber("fu_db_m_urine_object"),
afusDbUrinatesAloneFrequency = rawRecord.getOptionalNumber("fu_db_m_urine_alone"),
afusDbDefecatesAloneFrequency = rawRecord.getOptionalNumber("fu_db_m_defecate_alone"),
afusDbHyperactiveFrequency = rawRecord.getOptionalNumber("fu_db_m_hyperactive"),
afusDbPlayfulFrequency = rawRecord.getOptionalNumber("fu_db_m_playful"),
afusDbEnergeticFrequency = rawRecord.getOptionalNumber("fu_db_m_active"),
afusDbChasesTailFrequency = rawRecord.getOptionalNumber("fu_db_m_chase_tail"),
afusDbBarksFrequency = rawRecord.getOptionalNumber("fu_db_m_bark")
)
}
}
Loading