http读取doris数据源,写入到doris_dwd和kafka,flink报错空指针异常

Viewed 42

doris版本 2.0.4

streapark版本 2.0.0

kafka版本 kafka_2.12-2.4.1

image.png

flin报错详情

2024-03-19 09:33:02
java.lang.NullPointerException
at org.apache.doris.flink.rest.SchemaUtils.lambda$convertToSchema$0(SchemaUtils.java:36)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at org.apache.doris.flink.rest.SchemaUtils.convertToSchema(SchemaUtils.java:36)
at org.apache.doris.flink.datastream.ScalaValueReader.(ScalaValueReader.scala:127)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:96)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:45)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)

sql信息

set execution.runtime-mode = batch;
CREATE TABLE ods_lcyl_lis_lis_test_reg (
BARCODE STRING,
EXEC_SQN STRING,
HIS_ITEMCODE STRING,
ACCEPTLISTPRINTFLAG STRING,
ACUTE STRING,
ADJUSTSAMPLETESTERID STRING,
ADJUSTSAMPLETESTERNAME STRING,
BEDNO STRING,
BIRTHDAY BIGINT,
CLINICPRINTHOSTIP STRING,
CONFIRMDOCID STRING,
CONFIRMDOCIP STRING,
CONFIRMDOCNAME STRING,
CONFIRMDOCTIME BIGINT,
DIAGNOSE STRING,
DIAGNOSE_CYCLE STRING,
DOCTORDEPTID STRING,
DOCTORDEPTNAME STRING,
DOCTORID STRING,
DOCTORNAME STRING,
ERRHEALTH STRING,
EXPDATE STRING,
FACTORY STRING,
FEESEQID STRING,
FETUS STRING,
FIRSTTRIALDATE BIGINT,
GERMINSPECTIONTYPE STRING,
GESTATION STRING,
GETSAMPLEDATE BIGINT,
GETSAMPLETESTERID STRING,
GETSAMPLETESTERNAME STRING,
GROUPCODE STRING,
HEALTHDATE BIGINT,
HIS_ITEMNAME STRING,
HOSPITALID STRING,
IDENNO STRING,
INPATIENTNO STRING,
ISDISTINGUISH STRING ,
ISGREEN STRING,
ISHEALTH STRING ,
ISPDF STRING ,
ISWXHEALTH STRING,
ISYGHEALTH STRING,
ISYNPT STRING ,
ITEM_NUM DOUBLE ,
LASTSAMPLEACCEPTER STRING,
LASTSAMPLEACCEPTERID STRING,
LASTSAMPLEREACHTIME BIGINT,
LMP BIGINT,
LSPTESTFORM STRING,
MACHINECODE STRING,
MACHINE_MEMO STRING,
MEMO STRING,
NURSEID STRING,
NURSENAME STRING,
NURSE_CELL_CODE STRING,
PATIENTAGE STRING,
PATIENTNAME STRING,
PATIENTNAME1 STRING,
PATIENTNO STRING,
PATIENTSEX STRING,
PATIENTTYPE STRING,
PDFNAME STRING,
PEOPLETYPECODE STRING,
PEOPLETYPENAME STRING,
PRICE DOUBLE ,
PRINTGERMRECODEFLAG STRING,
PRINTTIME BIGINT,
READFLAG STRING ,
REPRINTFLAG STRING,
RESULTDATE BIGINT,
SAMPLEACCEPTER STRING,
SAMPLEACCEPTERID STRING,
SAMPLEACKER STRING,
SAMPLEACKTIME BIGINT,
SAMPLEPOSITION STRING,
SAMPLEREACHTIME BIGINT,
SAMPLESTATE STRING,
SAMPLETIME BIGINT,
SAMPLETYPE STRING,
STATE STRING,
TAKETYPE STRING,
TELNO STRING,
TESTDATE BIGINT,
TESTDESCRIBE STRING,
TESTFORMNO STRING,
TESTFORMNO1 STRING,
TESTTYPE STRING,
TUBECOLOR STRING,
UPDATESAMPLETIME STRING,
WARDID STRING,
WARDNAME STRING,
WBBARCODE STRING,
WEIGTH STRING,
YNPTDATE BIGINT,
ZJLX STRING,
__source_ts_ms BIGINT,
__op STRING,
__table STRING,
__db STRING,
__deleted STRING,
__dt STRING
) WITH (
'connector' = 'doris',
'fenodes' = '${doris.cluster.http}',
'table.identifier' = 'dw_ddbAEqIiCdH.ods_lcyl_all_1lis_test_reg',
'username' = 'root',
'password' = '000000',
'doris.request.tablet.size' = '1'
);

CREATE TABLE dwd_lcyl_lis_test_reg ( test_item_code STRING, electronicrequestnoteid STRING, bgdh STRING, machinecode STRING, groupcode STRING, testdate BIGINT, doctorid STRING, doctorname STRING, doctordeptid STRING, doctordeptname STRING, patientname1 STRING, patientsex STRING, patienttype STRING, test_method_code STRING, acute STRING, resultdate BIGINT, accept_specimen_dt BIGINT, test_technician_code STRING, getsampletesterid STRING, adjustsampletesterid STRING, state STRING, patientno STRING, test_item_name STRING, testformno STRING, patientage STRING, wardname STRING, bedno STRING, diagnose STRING, testformno1 STRING, item_num INT, price INT, testtype STRING, machine_memo STRING, testdescribe STRING, printtime BIGINT, sampleaccepter STRING, sampleaccepterid STRING, sampleacktime BIGINT, sampleacker STRING, hospitalid STRING, idenno STRING, telno STRING, factory STRING, sampleposition STRING, lastsamplereachtime BIGINT, lastsampleaccepter STRING, lastsampleaccepterid STRING, nurse_cell_code STRING, inpatientno STRING, clinicprinthostip STRING, firsttrialdate BIGINT, readflag STRING, updatesampletime STRING, reprintflag STRING, lsptestform STRING, confirmdocid STRING, confirmdocname STRING, confirmdoctime BIGINT, confirmdocip STRING, feeseqid STRING, birthday INT, weigth STRING, gestation STRING, fetus STRING, expdate STRING, lmp BIGINT, printgermrecodeflag STRING, isdistinguish STRING, ishealth STRING, wbbarcode STRING, taketype STRING, peopletypename STRING, peopletypecode STRING, iswxhealth STRING, errhealth STRING, pdfname STRING, isynpt STRING, ynptdate BIGINT, zjlx STRING, isyghealth STRING, patientname STRING, healthdate BIGINT, ispdf STRING, germinspectiontype STRING, reportaudit STRING, bbcyrqsj TIMESTAMP, jsbbrqsj TIMESTAMP, test_technician STRING, getsampletestername STRING, inspection_report_remarks STRING, bbzt STRING, _transform_dt BIGINT ) with ( 'connector' = 'doris', 'fenodes' = '${doris.cluster.http}', 'table.identifier' = 'dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'username' = 'root', 'password' = '000000', 'sink.batch.size' = '10000',
'sink.max-retries' = '2',
'sink.batch.interval' = '20000',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.format' = 'json',
'sink.properties.columns' = '');

CREATE TABLE dwd_lcyl_lis_test_reg_topic ( PRIMARY KEY (test_item_code,bgdh,electronicrequestnoteid) NOT ENFORCED, test_item_code STRING, electronicrequestnoteid STRING, bgdh STRING, machinecode STRING, groupcode STRING, testdate BIGINT, doctorid STRING, doctorname STRING, doctordeptid STRING, doctordeptname STRING, patientname1 STRING, patientsex STRING, patienttype STRING, test_method_code STRING, acute STRING, resultdate BIGINT, accept_specimen_dt BIGINT, test_technician_code STRING, getsampletesterid STRING, adjustsampletesterid STRING, state STRING, patientno STRING, test_item_name STRING, testformno STRING, patientage STRING, wardname STRING, bedno STRING, diagnose STRING, testformno1 STRING, item_num INT, price INT, testtype STRING, machine_memo STRING, testdescribe STRING, printtime BIGINT, sampleaccepter STRING, sampleaccepterid STRING, sampleacktime BIGINT, sampleacker STRING, hospitalid STRING, idenno STRING, telno STRING, factory STRING, sampleposition STRING, lastsamplereachtime BIGINT, lastsampleaccepter STRING, lastsampleaccepterid STRING, nurse_cell_code STRING, inpatientno STRING, clinicprinthostip STRING, firsttrialdate BIGINT, readflag STRING, updatesampletime STRING, reprintflag STRING, lsptestform STRING, confirmdocid STRING, confirmdocname STRING, confirmdoctime BIGINT, confirmdocip STRING, feeseqid STRING, birthday INT, weigth STRING, gestation STRING, fetus STRING, expdate STRING, lmp BIGINT, printgermrecodeflag STRING, isdistinguish STRING, ishealth STRING, wbbarcode STRING, taketype STRING, peopletypename STRING, peopletypecode STRING, iswxhealth STRING, errhealth STRING, pdfname STRING, isynpt STRING, ynptdate BIGINT, zjlx STRING, isyghealth STRING, patientname STRING, healthdate BIGINT, ispdf STRING, germinspectiontype STRING, reportaudit STRING, bbcyrqsj TIMESTAMP, jsbbrqsj TIMESTAMP, test_technician STRING, getsampletestername STRING, inspection_report_remarks STRING, bbzt STRING, _transform_dt BIGINT ) with ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '${kafka.cluster}', 'topic' = 'datadev.dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'key.format' = 'json', 'value.format' = 'json');
insert into dwd_lcyl_lis_test_reg select CAST(HIS_ITEMCODE as STRING), CAST(EXEC_SQN as STRING), CAST(BARCODE as STRING), CAST(MACHINECODE as STRING), CAST(GROUPCODE as STRING), TESTDATE, CAST(DOCTORID as STRING), CAST(DOCTORNAME as STRING), CAST(DOCTORDEPTID as STRING), CAST(DOCTORDEPTNAME as STRING), CAST(PATIENTNAME1 as STRING), CAST(PATIENTSEX as STRING), CAST(PATIENTTYPE as STRING), CAST(SAMPLETYPE as STRING), CAST(ACUTE as STRING), RESULTDATE, GETSAMPLEDATE, CAST(NURSEID as STRING), CAST(GETSAMPLETESTERID as STRING), CAST(ADJUSTSAMPLETESTERID as STRING), CAST(STATE as STRING), CAST(PATIENTNO as STRING), CAST(HIS_ITEMNAME as STRING), CAST(TESTFORMNO as STRING), CAST(PATIENTAGE as STRING), CAST(WARDNAME as STRING), CAST(BEDNO as STRING), CAST(DIAGNOSE as STRING), CAST(TESTFORMNO1 as STRING), CAST(ITEM_NUM as INT), CAST(PRICE as INT), CAST(TESTTYPE as STRING), CAST(MACHINE_MEMO as STRING), CAST(TESTDESCRIBE as STRING), PRINTTIME, CAST(SAMPLEACCEPTER as STRING), CAST(SAMPLEACCEPTERID as STRING), SAMPLEACKTIME, CAST(SAMPLEACKER as STRING), CAST(HOSPITALID as STRING), CAST(IDENNO as STRING), CAST(TELNO as STRING), CAST(FACTORY as STRING), CAST(SAMPLEPOSITION as STRING), LASTSAMPLEREACHTIME, CAST(LASTSAMPLEACCEPTER as STRING), CAST(LASTSAMPLEACCEPTERID as STRING), CAST(NURSE_CELL_CODE as STRING), CAST(INPATIENTNO as STRING), CAST(CLINICPRINTHOSTIP as STRING), FIRSTTRIALDATE, CAST(READFLAG as STRING), CAST(UPDATESAMPLETIME as STRING), CAST(REPRINTFLAG as STRING), CAST(LSPTESTFORM as STRING), CAST(CONFIRMDOCID as STRING), CAST(CONFIRMDOCNAME as STRING), CONFIRMDOCTIME, CAST(CONFIRMDOCIP as STRING), CAST(FEESEQID as STRING), CAST(BIRTHDAY as INT), CAST(WEIGTH as STRING), CAST(GESTATION as STRING), CAST(FETUS as STRING), CAST(EXPDATE as STRING), LMP, CAST(PRINTGERMRECODEFLAG as STRING), CAST(ISDISTINGUISH as STRING), CAST(ISHEALTH as STRING), CAST(WBBARCODE as STRING), CAST(TAKETYPE as STRING), CAST(PEOPLETYPENAME as STRING), CAST(PEOPLETYPECODE as STRING), CAST(ISWXHEALTH as STRING), CAST(ERRHEALTH as STRING), CAST(PDFNAME as STRING), CAST(ISYNPT as STRING), YNPTDATE, CAST(ZJLX as STRING), CAST(ISYGHEALTH as STRING), CAST(PATIENTNAME as STRING), HEALTHDATE, CAST(ISPDF as STRING), CAST(GERMINSPECTIONTYPE as STRING), CAST(ADJUSTSAMPLETESTERNAME as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME as STRING), CAST(GETSAMPLETESTERNAME as STRING), CAST(MEMO as STRING), CAST(SAMPLESTATE as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg where 1=1 ;

insert into dwd_lcyl_lis_test_reg_topic select CAST(HIS_ITEMCODE as STRING), CAST(EXEC_SQN as STRING), CAST(BARCODE as STRING), CAST(MACHINECODE as STRING), CAST(GROUPCODE as STRING), TESTDATE, CAST(DOCTORID as STRING), CAST(DOCTORNAME as STRING), CAST(DOCTORDEPTID as STRING), CAST(DOCTORDEPTNAME as STRING), CAST(PATIENTNAME1 as STRING), CAST(PATIENTSEX as STRING), CAST(PATIENTTYPE as STRING), CAST(SAMPLETYPE as STRING), CAST(ACUTE as STRING), RESULTDATE, GETSAMPLEDATE, CAST(NURSEID as STRING), CAST(GETSAMPLETESTERID as STRING), CAST(ADJUSTSAMPLETESTERID as STRING), CAST(STATE as STRING), CAST(PATIENTNO as STRING), CAST(HIS_ITEMNAME as STRING), CAST(TESTFORMNO as STRING), CAST(PATIENTAGE as STRING), CAST(WARDNAME as STRING), CAST(BEDNO as STRING), CAST(DIAGNOSE as STRING), CAST(TESTFORMNO1 as STRING), CAST(ITEM_NUM as INT), CAST(PRICE as INT), CAST(TESTTYPE as STRING), CAST(MACHINE_MEMO as STRING), CAST(TESTDESCRIBE as STRING), PRINTTIME, CAST(SAMPLEACCEPTER as STRING), CAST(SAMPLEACCEPTERID as STRING), SAMPLEACKTIME, CAST(SAMPLEACKER as STRING), CAST(HOSPITALID as STRING), CAST(IDENNO as STRING), CAST(TELNO as STRING), CAST(FACTORY as STRING), CAST(SAMPLEPOSITION as STRING), LASTSAMPLEREACHTIME, CAST(LASTSAMPLEACCEPTER as STRING), CAST(LASTSAMPLEACCEPTERID as STRING), CAST(NURSE_CELL_CODE as STRING), CAST(INPATIENTNO as STRING), CAST(CLINICPRINTHOSTIP as STRING), FIRSTTRIALDATE, CAST(READFLAG as STRING), CAST(UPDATESAMPLETIME as STRING), CAST(REPRINTFLAG as STRING), CAST(LSPTESTFORM as STRING), CAST(CONFIRMDOCID as STRING), CAST(CONFIRMDOCNAME as STRING), CONFIRMDOCTIME, CAST(CONFIRMDOCIP as STRING), CAST(FEESEQID as STRING), CAST(BIRTHDAY as INT), CAST(WEIGTH as STRING), CAST(GESTATION as STRING), CAST(FETUS as STRING), CAST(EXPDATE as STRING), LMP, CAST(PRINTGERMRECODEFLAG as STRING), CAST(ISDISTINGUISH as STRING), CAST(ISHEALTH as STRING), CAST(WBBARCODE as STRING), CAST(TAKETYPE as STRING), CAST(PEOPLETYPENAME as STRING), CAST(PEOPLETYPECODE as STRING), CAST(ISWXHEALTH as STRING), CAST(ERRHEALTH as STRING), CAST(PDFNAME as STRING), CAST(ISYNPT as STRING), YNPTDATE, CAST(ZJLX as STRING), CAST(ISYGHEALTH as STRING), CAST(PATIENTNAME as STRING), HEALTHDATE, CAST(ISPDF as STRING), CAST(GERMINSPECTIONTYPE as STRING), CAST(ADJUSTSAMPLETESTERNAME as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME as STRING), CAST(GETSAMPLETESTERNAME as STRING), CAST(MEMO as STRING), CAST(SAMPLESTATE as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg where 1=1 and (BARCODE IS NOT NULL) and (EXEC_SQN IS NOT NULL) and (HIS_ITEMCODE IS NOT NULL) ;

1 Answers

这个报错堆栈看不出什么有效信息,完整的堆栈有吗