Spark sortMergeJoin работает постоянно

Я присоединяюсь к двум кадрам данных, но соединение не завершается и работает много часов. Из-за этого 1 задача выполняется непрерывно, хотя 199 задач выполняются в течение нескольких секунд.

Я пробовал, переделывал и менял правый и левый фреймы данных. Но я вижу аналогичный результат. Что меня больше всего удивляет, так это размер данных: слева показано 112,5 ГБ, а справа — 106,2 ГБ. Хотя я изменил левый и правый кадры данных, но в обоих случаях размер данных слева показывал 112,5 ГБ, а справа — 106,2 ГБ, а строки слева медленно увеличивались.

stddev из левого фрейма данных:

+----------------------+------------------+----------+
|stddev_samp(count,0,0)|        avg(count)|  count(1)|
+----------------------+------------------+----------+
|     3.787161261654356|1.0046584439496327|1690685363|
+----------------------+------------------+----------+

stddev из правого фрейма данных:

+----------------------+-----------------+---------+
|stddev_samp(count,0,0)|       avg(count)| count(1)|
+----------------------+-----------------+---------+
|     5.384120090691418|1.029766419089913|835066016|
+----------------------+-----------------+---------+

SQL из пользовательского интерфейса Spark

== Оптимизированный логический план ==

Project [time#265L,event_datetime_EET#248,telia_psn#496L AS psn_key#753L,telia_pdnconnectionsmobileipaddr#493 AS pdnconnectionsmobileipaddr#754,telia_pdnconnectionsmobileipv6addr#494 AS pdnconnectionsmobileipv6addr#755,telia_pdnconnectionsapn#489 AS pdnconnectionsapn#756,telia_pdnconnectionsapnambrdl#490L AS pdnconnectionsapnambrdl#757L,telia_pdnconnectionsapnambrul#491L AS pdnconnectionsapnambrul#758L,telia_pdnconnectionsdefaultbearerebi#492 AS pdnconnectionsdefaultbearerebi#759,telia_pdnconnectionsselectionmode#495 AS pdnconnectionsselectionmode#760,telia_pdnbearersebi#476 AS pdnbearersebi#761,telia_pdnbearerslinkedebi#481 AS pdnbearerslinkedebi#762,telia_pdnbearersbeareresttime#472L AS pdnbearersbeareresttime#763L,telia_pdnbearersdluserplanefteidinterface#473 AS pdnbearersdluserplanefteidinterface#764,telia_pdnbearersdluserplanefteidteid#475L AS pdnbearersdluserplanefteidteid#765L,telia_pdnbearersdluserplanefteidip#474 AS pdnbearersdluserplanefteidip#766,telia_pdnbearersuluserplanefteidinterface#486 AS pdnbearersuluserplanefteidinterface#767,telia_pdnbearersuluserplanefteidteid#488L AS pdnbearersuluserplanefteidteid#768L,telia_pdnbearersuluserplanefteidip#487 AS pdnbearersuluserplanefteidip#769,telia_pdnbearersqosqci#485 AS pdnbearersqosqci#770,telia_pdnbearersprioritylevel#484 AS pdnbearersprioritylevel#771,telia_pdnbearerspreemptioncapability#482 AS pdnbearerspreemptioncapability#772,telia_pdnbearerspreemptionvulnerability#483 AS pdnbearerspreemptionvulnerability#773,telia_pdnbearersgbrmaxbitratedl#479L AS pdnbearersgbrmaxbitratedl#774L,telia_pdnbearersgbrmaxbitrateul#480L AS pdnbearersgbrmaxbitrateul#775L,telia_pdnbearersgbrguaranteedbitratedl#477L AS pdnbearersgbrguaranteedbitratedl#776L,telia_pdnbearersgbrguaranteedbitrateul#478L AS pdnbearersgbrguaranteedbitrateul#777L,imsi#470L,md5ueidentityimsi#257,ueidentityimei#270L,ueidentityimeitac#271,msisdn#258L,md5msisdn#256,rattype_lu#261,locationinfotac#255,locationinfoeci#252,event_year#469,event_month#468,event_day#466,event_hour#467,ingestion_timestamp#471,child_instance_id#465 AS job_instance_id#778]
    +- Join Inner, Some((((imsi#470L = ueidentityimsi#272L) && (telia_psn#496L = psn_key#259L)) && (time#497L = time#265L)))
       :- Project [event_year#469,ingestion_timestamp#471,telia_pdnbearersebi#476,telia_pdnbearersuluserplanefteidinterface#486,telia_pdnbearersgbrguaranteedbitrateul#478L,telia_pdnbearerspreemptioncapability#482,telia_psn#496L,telia_pdnbearerslinkedebi#481,telia_pdnconnectionsdefaultbearerebi#492,telia_pdnconnectionsapn#489,event_hour#467,telia_pdnconnectionsmobileipaddr#493,telia_pdnbearersdluserplanefteidteid#475L,telia_pdnbearersuluserplanefteidip#487,telia_pdnbearersqosqci#485,telia_pdnbearersbeareresttime#472L,telia_pdnbearersprioritylevel#484,child_instance_id#465,event_day#466,telia_pdnconnectionsselectionmode#495,telia_pdnbearersgbrguaranteedbitratedl#477L,time#497L,telia_pdnconnectionsapnambrul#491L,telia_pdnbearersgbrmaxbitrateul#480L,telia_pdnbearerspreemptionvulnerability#483,telia_pdnbearersuluserplanefteidteid#488L,telia_pdnbearersdluserplanefteidinterface#473,telia_pdnbearersgbrmaxbitratedl#479L,event_month#468,telia_pdnconnectionsmobileipv6addr#494,telia_pdnconnectionsapnambrdl#490L,telia_pdnbearersdluserplanefteidip#474,imsi#470L]
       :  +- RepartitionByExpression [imsi#470L,telia_psn#496L,time#497L], None
       :     +- Relation[child_instance_id#465,event_day#466,event_hour#467,event_month#468,event_year#469,imsi#470L,ingestion_timestamp#471,telia_pdnbearersbeareresttime#472L,telia_pdnbearersdluserplanefteidinterface#473,telia_pdnbearersdluserplanefteidip#474,telia_pdnbearersdluserplanefteidteid#475L,telia_pdnbearersebi#476,telia_pdnbearersgbrguaranteedbitratedl#477L,telia_pdnbearersgbrguaranteedbitrateul#478L,telia_pdnbearersgbrmaxbitratedl#479L,telia_pdnbearersgbrmaxbitrateul#480L,telia_pdnbearerslinkedebi#481,telia_pdnbearerspreemptioncapability#482,telia_pdnbearerspreemptionvulnerability#483,telia_pdnbearersprioritylevel#484,telia_pdnbearersqosqci#485,telia_pdnbearersuluserplanefteidinterface#486,telia_pdnbearersuluserplanefteidip#487,telia_pdnbearersuluserplanefteidteid#488L,telia_pdnconnectionsapn#489,telia_pdnconnectionsapnambrdl#490L,telia_pdnconnectionsapnambrul#491L,telia_pdnconnectionsdefaultbearerebi#492,telia_pdnconnectionsmobileipaddr#493,telia_pdnconnectionsmobileipv6addr#494,telia_pdnconnectionsselectionmode#495,telia_psn#496L,time#497L,ing_time#498L,job_instance_id#499] AvroRelation
       +- Project [psn_key#259L,event_datetime_EET#248,msisdn#258L,ueidentityimeitac#271,md5ueidentityimsi#257,ueidentityimsi#272L,locationinfoeci#252,time#265L,md5msisdn#256,ueidentityimei#270L,rattype_lu#261,locationinfotac#255]
          +- RepartitionByExpression [ueidentityimsi#272L,psn_key#259L,time#265L], None
             +- Project [time#265L,event_datetime_EET#248,md5ueidentityimsi#257,ueidentityimei#270L,ueidentityimeitac#271,msisdn#258L,md5msisdn#256,locationinfotac#255,locationinfoeci#252,rattype_lu#261,psn_key#259L,ueidentityimsi#272L,job_instance_id#251,event_year#281,event_month#282,event_day#283,event_hour#284]
                +- Join Inner, Some((((((job_instance_id#251 = job_instance_id#5) && (event_year#281 = event_year#3)) && (event_month#282 = event_month#2)) && (event_day#283 = event_day#0)) && (event_hour#284 = event_hour#1)))
                   :- Project [time#265L,event_datetime_EET#248,md5ueidentityimsi#257,ueidentityimei#270L,ueidentityimeitac#271,msisdn#258L,md5msisdn#256,locationinfotac#255,locationinfoeci#252,rattype_lu#261,psn_key#259L,ueidentityimsi#272L,job_instance_id#251,event_year#281,event_month#282,event_day#283,event_hour#284]
                   :  +- Relation[charginginfochargingcharacteristics#240,charginginfocharginggatewayaddress#241,charginginfocharginggatewayname#242,charginginfochargingid#243L,dlcontrolplanefteidinterface#244,dlcontrolplanefteidip#245,dlcontrolplanefteidteid#246L,endtime#247L,event_datetime_EET#248,gtpv2causevalue_lu#249,ingestion_timestamp#250,job_instance_id#251,locationinfoeci#252,locationinfomcc_lu#253,locationinfomnc_lu#254,locationinfotac#255,md5msisdn#256,md5ueidentityimsi#257,msisdn#258L,psn_key#259L,ptmsi#260L,rattype_lu#261,recordtype_lu#262,starttime#263L,status#264,time#265L,ueidentitygutimmegi#266,ueidentitygutiplmnid#267,ueidentitygutistmsimmec#268,ueidentitygutistmsimtsmi#269,ueidentityimei#270L,ueidentityimeitac#271,ueidentityimsi#272L,ueidentitystmsimmec#273L,ueidentitystmsimtmsi#274L,ulcontrolplanefteidinterface#275,ulcontrolplanefteidip#276,ulcontrolplanefteidteid#277L,vlaninfovlancfi_lu#278,vlaninfovlanid#279,vlaninfovlanpriority_lu#280,event_year#281,event_month#282,event_day#283,event_hour#284] ParquetRelation
                   +- Project [event_month#2,event_day#0,event_year#3,job_instance_id#5,event_hour#1]
                      +- InMemoryRelation [event_day#0,event_hour#1,event_month#2,event_year#3,ing_year#41,ing_month#42,ing_day#43,ing_hour#44,job_instance_id#5], true, 10000, StorageLevel(true, true, false, false, 1), TungstenAggregate(key=[event_day#0,event_hour#1,event_month#2,event_year#3,ing_year#41,ing_month#42,ing_day#43,ing_hour#44,job_instance_id#5], functions=[], output=[event_day#0,event_hour#1,event_month#2,event_year#3,ing_year#41,ing_month#42,ing_day#43,ing_hour#44,job_instance_id#5]), None

person Varun    schedule 11.08.2018    source источник
comment
Не могли бы вы показать немного кода и рассказать нам больше о данных? Это может быть декартово произведение на одном или нескольких дублированных ключах, но без дополнительной информации невозможно быть уверенным.   -  person Oli    schedule 11.08.2018


Ответы (1)


Поскольку есть одна медленная задача, это звучит так, как будто есть один ключ с большинством записей. Это известно как проблема искаженного соединения, некоторые подходы к ее решению подробно описаны здесь: Искра?.

person DW.    schedule 12.08.2018