doris版本 2.0.4
streapark版本 2.0.0
kafka版本 kafka_2.12-2.4.1
flin报错详情
2024-03-19 09:33:02
java.lang.NullPointerException
at org.apache.doris.flink.rest.SchemaUtils.lambda$convertToSchema$0(SchemaUtils.java:36)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at org.apache.doris.flink.rest.SchemaUtils.convertToSchema(SchemaUtils.java:36)
at org.apache.doris.flink.datastream.ScalaValueReader.(ScalaValueReader.scala:127)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:96)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:45)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
sql信息
set execution.runtime-mode = batch;
CREATE TABLE ods_lcyl_lis_lis_test_reg (
BARCODE
STRING,
EXEC_SQN
STRING,
HIS_ITEMCODE
STRING,
ACCEPTLISTPRINTFLAG
STRING,
ACUTE
STRING,
ADJUSTSAMPLETESTERID
STRING,
ADJUSTSAMPLETESTERNAME
STRING,
BEDNO
STRING,
BIRTHDAY
BIGINT,
CLINICPRINTHOSTIP
STRING,
CONFIRMDOCID
STRING,
CONFIRMDOCIP
STRING,
CONFIRMDOCNAME
STRING,
CONFIRMDOCTIME
BIGINT,
DIAGNOSE
STRING,
DIAGNOSE_CYCLE
STRING,
DOCTORDEPTID
STRING,
DOCTORDEPTNAME
STRING,
DOCTORID
STRING,
DOCTORNAME
STRING,
ERRHEALTH
STRING,
EXPDATE
STRING,
FACTORY
STRING,
FEESEQID
STRING,
FETUS
STRING,
FIRSTTRIALDATE
BIGINT,
GERMINSPECTIONTYPE
STRING,
GESTATION
STRING,
GETSAMPLEDATE
BIGINT,
GETSAMPLETESTERID
STRING,
GETSAMPLETESTERNAME
STRING,
GROUPCODE
STRING,
HEALTHDATE
BIGINT,
HIS_ITEMNAME
STRING,
HOSPITALID
STRING,
IDENNO
STRING,
INPATIENTNO
STRING,
ISDISTINGUISH
STRING ,
ISGREEN
STRING,
ISHEALTH
STRING ,
ISPDF
STRING ,
ISWXHEALTH
STRING,
ISYGHEALTH
STRING,
ISYNPT
STRING ,
ITEM_NUM
DOUBLE ,
LASTSAMPLEACCEPTER
STRING,
LASTSAMPLEACCEPTERID
STRING,
LASTSAMPLEREACHTIME
BIGINT,
LMP
BIGINT,
LSPTESTFORM
STRING,
MACHINECODE
STRING,
MACHINE_MEMO
STRING,
MEMO
STRING,
NURSEID
STRING,
NURSENAME
STRING,
NURSE_CELL_CODE
STRING,
PATIENTAGE
STRING,
PATIENTNAME
STRING,
PATIENTNAME1
STRING,
PATIENTNO
STRING,
PATIENTSEX
STRING,
PATIENTTYPE
STRING,
PDFNAME
STRING,
PEOPLETYPECODE
STRING,
PEOPLETYPENAME
STRING,
PRICE
DOUBLE ,
PRINTGERMRECODEFLAG
STRING,
PRINTTIME
BIGINT,
READFLAG
STRING ,
REPRINTFLAG
STRING,
RESULTDATE
BIGINT,
SAMPLEACCEPTER
STRING,
SAMPLEACCEPTERID
STRING,
SAMPLEACKER
STRING,
SAMPLEACKTIME
BIGINT,
SAMPLEPOSITION
STRING,
SAMPLEREACHTIME
BIGINT,
SAMPLESTATE
STRING,
SAMPLETIME
BIGINT,
SAMPLETYPE
STRING,
STATE
STRING,
TAKETYPE
STRING,
TELNO
STRING,
TESTDATE
BIGINT,
TESTDESCRIBE
STRING,
TESTFORMNO
STRING,
TESTFORMNO1
STRING,
TESTTYPE
STRING,
TUBECOLOR
STRING,
UPDATESAMPLETIME
STRING,
WARDID
STRING,
WARDNAME
STRING,
WBBARCODE
STRING,
WEIGTH
STRING,
YNPTDATE
BIGINT,
ZJLX
STRING,
__source_ts_ms
BIGINT,
__op
STRING,
__table
STRING,
__db
STRING,
__deleted
STRING,
__dt
STRING
) WITH (
'connector' = 'doris',
'fenodes' = '${doris.cluster.http}',
'table.identifier' = 'dw_ddbAEqIiCdH.ods_lcyl_all_1lis_test_reg',
'username' = 'root',
'password' = '000000',
'doris.request.tablet.size' = '1'
);
CREATE TABLE dwd_lcyl_lis_test_reg
( test_item_code
STRING, electronicrequestnoteid
STRING, bgdh
STRING, machinecode
STRING, groupcode
STRING, testdate
BIGINT, doctorid
STRING, doctorname
STRING, doctordeptid
STRING, doctordeptname
STRING, patientname1
STRING, patientsex
STRING, patienttype
STRING, test_method_code
STRING, acute
STRING, resultdate
BIGINT, accept_specimen_dt
BIGINT, test_technician_code
STRING, getsampletesterid
STRING, adjustsampletesterid
STRING, state
STRING, patientno
STRING, test_item_name
STRING, testformno
STRING, patientage
STRING, wardname
STRING, bedno
STRING, diagnose
STRING, testformno1
STRING, item_num
INT, price
INT, testtype
STRING, machine_memo
STRING, testdescribe
STRING, printtime
BIGINT, sampleaccepter
STRING, sampleaccepterid
STRING, sampleacktime
BIGINT, sampleacker
STRING, hospitalid
STRING, idenno
STRING, telno
STRING, factory
STRING, sampleposition
STRING, lastsamplereachtime
BIGINT, lastsampleaccepter
STRING, lastsampleaccepterid
STRING, nurse_cell_code
STRING, inpatientno
STRING, clinicprinthostip
STRING, firsttrialdate
BIGINT, readflag
STRING, updatesampletime
STRING, reprintflag
STRING, lsptestform
STRING, confirmdocid
STRING, confirmdocname
STRING, confirmdoctime
BIGINT, confirmdocip
STRING, feeseqid
STRING, birthday
INT, weigth
STRING, gestation
STRING, fetus
STRING, expdate
STRING, lmp
BIGINT, printgermrecodeflag
STRING, isdistinguish
STRING, ishealth
STRING, wbbarcode
STRING, taketype
STRING, peopletypename
STRING, peopletypecode
STRING, iswxhealth
STRING, errhealth
STRING, pdfname
STRING, isynpt
STRING, ynptdate
BIGINT, zjlx
STRING, isyghealth
STRING, patientname
STRING, healthdate
BIGINT, ispdf
STRING, germinspectiontype
STRING, reportaudit
STRING, bbcyrqsj
TIMESTAMP, jsbbrqsj
TIMESTAMP, test_technician
STRING, getsampletestername
STRING, inspection_report_remarks
STRING, bbzt
STRING, _transform_dt
BIGINT ) with ( 'connector' = 'doris', 'fenodes' = '${doris.cluster.http}', 'table.identifier' = 'dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'username' = 'root', 'password' = '000000', 'sink.batch.size' = '10000',
'sink.max-retries' = '2',
'sink.batch.interval' = '20000',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.format' = 'json',
'sink.properties.columns' = '');
CREATE TABLE dwd_lcyl_lis_test_reg_topic
( PRIMARY KEY (test_item_code
,bgdh
,electronicrequestnoteid
) NOT ENFORCED, test_item_code
STRING, electronicrequestnoteid
STRING, bgdh
STRING, machinecode
STRING, groupcode
STRING, testdate
BIGINT, doctorid
STRING, doctorname
STRING, doctordeptid
STRING, doctordeptname
STRING, patientname1
STRING, patientsex
STRING, patienttype
STRING, test_method_code
STRING, acute
STRING, resultdate
BIGINT, accept_specimen_dt
BIGINT, test_technician_code
STRING, getsampletesterid
STRING, adjustsampletesterid
STRING, state
STRING, patientno
STRING, test_item_name
STRING, testformno
STRING, patientage
STRING, wardname
STRING, bedno
STRING, diagnose
STRING, testformno1
STRING, item_num
INT, price
INT, testtype
STRING, machine_memo
STRING, testdescribe
STRING, printtime
BIGINT, sampleaccepter
STRING, sampleaccepterid
STRING, sampleacktime
BIGINT, sampleacker
STRING, hospitalid
STRING, idenno
STRING, telno
STRING, factory
STRING, sampleposition
STRING, lastsamplereachtime
BIGINT, lastsampleaccepter
STRING, lastsampleaccepterid
STRING, nurse_cell_code
STRING, inpatientno
STRING, clinicprinthostip
STRING, firsttrialdate
BIGINT, readflag
STRING, updatesampletime
STRING, reprintflag
STRING, lsptestform
STRING, confirmdocid
STRING, confirmdocname
STRING, confirmdoctime
BIGINT, confirmdocip
STRING, feeseqid
STRING, birthday
INT, weigth
STRING, gestation
STRING, fetus
STRING, expdate
STRING, lmp
BIGINT, printgermrecodeflag
STRING, isdistinguish
STRING, ishealth
STRING, wbbarcode
STRING, taketype
STRING, peopletypename
STRING, peopletypecode
STRING, iswxhealth
STRING, errhealth
STRING, pdfname
STRING, isynpt
STRING, ynptdate
BIGINT, zjlx
STRING, isyghealth
STRING, patientname
STRING, healthdate
BIGINT, ispdf
STRING, germinspectiontype
STRING, reportaudit
STRING, bbcyrqsj
TIMESTAMP, jsbbrqsj
TIMESTAMP, test_technician
STRING, getsampletestername
STRING, inspection_report_remarks
STRING, bbzt
STRING, _transform_dt
BIGINT ) with ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '${kafka.cluster}', 'topic' = 'datadev.dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'key.format' = 'json', 'value.format' = 'json');
insert into dwd_lcyl_lis_test_reg
select CAST(HIS_ITEMCODE
as STRING), CAST(EXEC_SQN
as STRING), CAST(BARCODE
as STRING), CAST(MACHINECODE
as STRING), CAST(GROUPCODE
as STRING), TESTDATE
, CAST(DOCTORID
as STRING), CAST(DOCTORNAME
as STRING), CAST(DOCTORDEPTID
as STRING), CAST(DOCTORDEPTNAME
as STRING), CAST(PATIENTNAME1
as STRING), CAST(PATIENTSEX
as STRING), CAST(PATIENTTYPE
as STRING), CAST(SAMPLETYPE
as STRING), CAST(ACUTE
as STRING), RESULTDATE
, GETSAMPLEDATE
, CAST(NURSEID
as STRING), CAST(GETSAMPLETESTERID
as STRING), CAST(ADJUSTSAMPLETESTERID
as STRING), CAST(STATE
as STRING), CAST(PATIENTNO
as STRING), CAST(HIS_ITEMNAME
as STRING), CAST(TESTFORMNO
as STRING), CAST(PATIENTAGE
as STRING), CAST(WARDNAME
as STRING), CAST(BEDNO
as STRING), CAST(DIAGNOSE
as STRING), CAST(TESTFORMNO1
as STRING), CAST(ITEM_NUM
as INT), CAST(PRICE
as INT), CAST(TESTTYPE
as STRING), CAST(MACHINE_MEMO
as STRING), CAST(TESTDESCRIBE
as STRING), PRINTTIME
, CAST(SAMPLEACCEPTER
as STRING), CAST(SAMPLEACCEPTERID
as STRING), SAMPLEACKTIME
, CAST(SAMPLEACKER
as STRING), CAST(HOSPITALID
as STRING), CAST(IDENNO
as STRING), CAST(TELNO
as STRING), CAST(FACTORY
as STRING), CAST(SAMPLEPOSITION
as STRING), LASTSAMPLEREACHTIME
, CAST(LASTSAMPLEACCEPTER
as STRING), CAST(LASTSAMPLEACCEPTERID
as STRING), CAST(NURSE_CELL_CODE
as STRING), CAST(INPATIENTNO
as STRING), CAST(CLINICPRINTHOSTIP
as STRING), FIRSTTRIALDATE
, CAST(READFLAG
as STRING), CAST(UPDATESAMPLETIME
as STRING), CAST(REPRINTFLAG
as STRING), CAST(LSPTESTFORM
as STRING), CAST(CONFIRMDOCID
as STRING), CAST(CONFIRMDOCNAME
as STRING), CONFIRMDOCTIME
, CAST(CONFIRMDOCIP
as STRING), CAST(FEESEQID
as STRING), CAST(BIRTHDAY
as INT), CAST(WEIGTH
as STRING), CAST(GESTATION
as STRING), CAST(FETUS
as STRING), CAST(EXPDATE
as STRING), LMP
, CAST(PRINTGERMRECODEFLAG
as STRING), CAST(ISDISTINGUISH
as STRING), CAST(ISHEALTH
as STRING), CAST(WBBARCODE
as STRING), CAST(TAKETYPE
as STRING), CAST(PEOPLETYPENAME
as STRING), CAST(PEOPLETYPECODE
as STRING), CAST(ISWXHEALTH
as STRING), CAST(ERRHEALTH
as STRING), CAST(PDFNAME
as STRING), CAST(ISYNPT
as STRING), YNPTDATE
, CAST(ZJLX
as STRING), CAST(ISYGHEALTH
as STRING), CAST(PATIENTNAME
as STRING), HEALTHDATE
, CAST(ISPDF
as STRING), CAST(GERMINSPECTIONTYPE
as STRING), CAST(ADJUSTSAMPLETESTERNAME
as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME
as STRING), CAST(GETSAMPLETESTERNAME
as STRING), CAST(MEMO
as STRING), CAST(SAMPLESTATE
as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg
where 1=1 ;
insert into dwd_lcyl_lis_test_reg_topic
select CAST(HIS_ITEMCODE
as STRING), CAST(EXEC_SQN
as STRING), CAST(BARCODE
as STRING), CAST(MACHINECODE
as STRING), CAST(GROUPCODE
as STRING), TESTDATE
, CAST(DOCTORID
as STRING), CAST(DOCTORNAME
as STRING), CAST(DOCTORDEPTID
as STRING), CAST(DOCTORDEPTNAME
as STRING), CAST(PATIENTNAME1
as STRING), CAST(PATIENTSEX
as STRING), CAST(PATIENTTYPE
as STRING), CAST(SAMPLETYPE
as STRING), CAST(ACUTE
as STRING), RESULTDATE
, GETSAMPLEDATE
, CAST(NURSEID
as STRING), CAST(GETSAMPLETESTERID
as STRING), CAST(ADJUSTSAMPLETESTERID
as STRING), CAST(STATE
as STRING), CAST(PATIENTNO
as STRING), CAST(HIS_ITEMNAME
as STRING), CAST(TESTFORMNO
as STRING), CAST(PATIENTAGE
as STRING), CAST(WARDNAME
as STRING), CAST(BEDNO
as STRING), CAST(DIAGNOSE
as STRING), CAST(TESTFORMNO1
as STRING), CAST(ITEM_NUM
as INT), CAST(PRICE
as INT), CAST(TESTTYPE
as STRING), CAST(MACHINE_MEMO
as STRING), CAST(TESTDESCRIBE
as STRING), PRINTTIME
, CAST(SAMPLEACCEPTER
as STRING), CAST(SAMPLEACCEPTERID
as STRING), SAMPLEACKTIME
, CAST(SAMPLEACKER
as STRING), CAST(HOSPITALID
as STRING), CAST(IDENNO
as STRING), CAST(TELNO
as STRING), CAST(FACTORY
as STRING), CAST(SAMPLEPOSITION
as STRING), LASTSAMPLEREACHTIME
, CAST(LASTSAMPLEACCEPTER
as STRING), CAST(LASTSAMPLEACCEPTERID
as STRING), CAST(NURSE_CELL_CODE
as STRING), CAST(INPATIENTNO
as STRING), CAST(CLINICPRINTHOSTIP
as STRING), FIRSTTRIALDATE
, CAST(READFLAG
as STRING), CAST(UPDATESAMPLETIME
as STRING), CAST(REPRINTFLAG
as STRING), CAST(LSPTESTFORM
as STRING), CAST(CONFIRMDOCID
as STRING), CAST(CONFIRMDOCNAME
as STRING), CONFIRMDOCTIME
, CAST(CONFIRMDOCIP
as STRING), CAST(FEESEQID
as STRING), CAST(BIRTHDAY
as INT), CAST(WEIGTH
as STRING), CAST(GESTATION
as STRING), CAST(FETUS
as STRING), CAST(EXPDATE
as STRING), LMP
, CAST(PRINTGERMRECODEFLAG
as STRING), CAST(ISDISTINGUISH
as STRING), CAST(ISHEALTH
as STRING), CAST(WBBARCODE
as STRING), CAST(TAKETYPE
as STRING), CAST(PEOPLETYPENAME
as STRING), CAST(PEOPLETYPECODE
as STRING), CAST(ISWXHEALTH
as STRING), CAST(ERRHEALTH
as STRING), CAST(PDFNAME
as STRING), CAST(ISYNPT
as STRING), YNPTDATE
, CAST(ZJLX
as STRING), CAST(ISYGHEALTH
as STRING), CAST(PATIENTNAME
as STRING), HEALTHDATE
, CAST(ISPDF
as STRING), CAST(GERMINSPECTIONTYPE
as STRING), CAST(ADJUSTSAMPLETESTERNAME
as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME
as STRING), CAST(GETSAMPLETESTERNAME
as STRING), CAST(MEMO
as STRING), CAST(SAMPLESTATE
as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg
where 1=1 and (BARCODE IS NOT NULL) and (EXEC_SQN IS NOT NULL) and (HIS_ITEMCODE IS NOT NULL) ;